[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r206353416 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -120,7 +133,7 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: case MapType(kt, vt, valueContainsNull) if kt == StringType => val valueConverter = newConverter( - vt, resolveNullableType(avroType.getValueType, valueContainsNull)) + vt, resolveUnionType(avroType.getValueType, vt, valueContainsNull)) --- End diff -- ditto for MapType, ```scala (getter, ordinal) => val mapData = getter.getMap(ordinal) val length = mapData.numElements() val result = new java.util.HashMap[String, Any](length) val keyArray = mapData.keyArray() val valueArray = mapData.valueArray() var i = 0 while (i < length) { val key = keyArray.getUTF8String(i).toString if (valueContainsNull && valueArray.isNullAt(i)) { result.put(key, null) } else { result.put(key, valueConverter(valueArray, i)) } i += 1 } result ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r206350423 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -87,17 +87,30 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: case d: DecimalType => (getter, ordinal) => getter.getDecimal(ordinal, d.precision, d.scale).toString case StringType => -(getter, ordinal) => new Utf8(getter.getUTF8String(ordinal).getBytes) +if (avroType.getType == Type.ENUM) { + (getter, ordinal) => +new EnumSymbol(avroType, getter.getUTF8String(ordinal).toString) +} else { + (getter, ordinal) => +new Utf8(getter.getUTF8String(ordinal).getBytes) +} case BinaryType => -(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) +if (avroType.getType == Type.FIXED) { + // Handles fixed-type fields in output schema. Test case is included in test.avro + // as it includes several fixed fields that would fail if we specify schema + // on-write without this condition + (getter, ordinal) => new Fixed(avroType, getter.getBinary(ordinal)) +} else { + (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) +} case DateType => (getter, ordinal) => getter.getInt(ordinal) * DateTimeUtils.MILLIS_PER_DAY case TimestampType => (getter, ordinal) => getter.getLong(ordinal) / 1000 case ArrayType(et, containsNull) => val elementConverter = newConverter( - et, resolveNullableType(avroType.getElementType, containsNull)) + et, resolveUnionType(avroType.getElementType, et, containsNull)) (getter, ordinal) => { --- End diff -- With `if (containsNull && arrayData.isNullAt(i))`, JVM can remove the branching if it's not nullable. We can use array instead of arraybuffer to get better performance. ```scala (getter, ordinal) => { val arrayData = getter.getArray(ordinal) val length = arrayData.numElements() val result = new Array[Any](length) var i = 0 while (i < length) { if (containsNull && arrayData.isNullAt(i)) { result(i) = null } else { result(i) = elementConverter(arrayData, i) } i += 1 } result } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21904: [SPARK-24953] [SQL] Prune a branch in `CaseWhen` ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21904#discussion_r206333426 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -416,6 +450,12 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, _) if pruneSeenBranches(branches).nonEmpty => +e.copy(branches = pruneSeenBranches(branches).get) + + case e @ CaseWhen(branches, _) if combineAdjacentBranches(branches).nonEmpty => +e.copy(branches = combineAdjacentBranches(branches).get) --- End diff -- This will work. But my only concern is that this will be caught all the `CaseWhen` cases; as a result, no more rule can be added after this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21852#discussion_r206271589 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -416,6 +416,23 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, Some(elseValue)) if { +val values = branches.map(_._2) :+ elseValue +values.tail.forall(values.head.semanticEquals) --- End diff -- I replaced the cond by `branches.forall(_._2.semanticEquals(elseValue))` which is simpler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21852#discussion_r206266243 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -416,6 +416,23 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, Some(elseValue)) if { +val values = branches.map(_._2) :+ elseValue +values.tail.forall(values.head.semanticEquals) --- End diff -- I think what you suggested is the same as the implemented code in the PR, and `elseValue.deterministic` is not required since it is checked in `semanticEquals`. Being said that, the whole condition can be replaced by `branches.exists(_._2.semanticEquals(elseValue))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21904: [SPARK-24953] [SQL] Prune a branch in `CaseWhen` ...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21904#discussion_r205963712 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -416,6 +416,29 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, _) => +val newBranches = branches.foldLeft(List[(Expression, Expression)]()) { + case (newBranches, branch) => +if (newBranches.exists(_._1.semanticEquals(branch._1))) { + // If a condition in a branch is previously seen, this branch can be pruned. + // TODO: In fact, if a condition is a sub-condition of the previous one, + // TODO: it can be pruned. This is less strict and can be implemented + // TODO: by decomposing seen conditions. + newBranches +} else if (newBranches.nonEmpty && newBranches.last._2.semanticEquals(branch._2)) { + // If the outputs of two adjacent branches are the same, two branches can be combined. + newBranches.take(newBranches.length - 1) +.:+((Or(newBranches.last._1, branch._1), newBranches.last._2)) --- End diff -- For example, the following case can be benefitted from this rule, ```scala CaseWhen((UnresolvedAttribute("a"), Literal(1)) :: (Not(UnresolvedAttribute("a")), Literal(1)) :: (LessThan(Rand(1), Literal(0.5)), Literal(3)) :: (NonFoldableLiteral(true), Literal(4)) :: (NonFoldableLiteral(false), Literal(5)) :: Nil, None), Literal(1)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21852#discussion_r205946975 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -416,6 +416,23 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, Some(elseValue)) if { --- End diff -- We can not. When no `elseValue`, all the conditions are required to evaluated before hitting the default `elseValue` which is `null`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21904: [SPARK-24953] [SQL] Prune a branch in `CaseWhen` ...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21904 [SPARK-24953] [SQL] Prune a branch in `CaseWhen` if previously seen ## What changes were proposed in this pull request? If a condition in a branch is previously seen, this branch can be pruned. If the outputs of two adjacent branches are the same, two branches can be combined. ## How was this patch tested? Tests added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark prune-case-when-branch Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21904.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21904 commit 4ac3d1efdec05d56f5051901ca209b67b23be28b Author: DB Tsai Date: 2018-07-27T23:47:02Z prune casewhen branch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if all th...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21852 +cc @cloud-fan and @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205830257 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,9 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(Seq((cond, trueValue)), elseValue) => +If(cond, trueValue, elseValue.getOrElse(Literal(null, trueValue.dataType))) --- End diff -- Let's revisit this PR later, and we should always try to add CASE WHEN version for parity. Here is the one for case when. https://github.com/apache/spark/pull/21852 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r205692946 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -165,16 +183,112 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: result } - private def resolveNullableType(avroType: Schema, nullable: Boolean): Schema = { -if (nullable) { - // avro uses union to represent nullable type. - val fields = avroType.getTypes.asScala - assert(fields.length == 2) - val actualType = fields.filter(_.getType != NULL) - assert(actualType.length == 1) - actualType.head + // Resolve an Avro union against a supplied DataType, i.e. a LongType compared against + // a ["null", "long"] should return a schema of type Schema.Type.LONG + // This function also handles resolving a DataType against unions of 2 or more types, i.e. + // an IntType resolves against a ["int", "long", "null"] will correctly return a schema of + // type Schema.Type.LONG + private def resolveNullableType(avroType: Schema, catalystType: DataType, + nullable: Boolean): Schema = { +(nullable, avroType.getType) match { + case (false, Type.UNION) | (true, Type.UNION) => +// avro uses union to represent nullable type. +val fieldTypes = avroType.getTypes.asScala + +// If we're nullable, we need to have at least two types. Cases with more than two types +// are captured in test("read read-write, read-write w/ schema, read") w/ test.avro input +assert(fieldTypes.length >= 2) + +val actualType = catalystType match { + case NullType => fieldTypes.filter(_.getType == Type.NULL) + case BooleanType => fieldTypes.filter(_.getType == Type.BOOLEAN) + case ByteType => fieldTypes.filter(_.getType == Type.INT) + case BinaryType => +val at = fieldTypes.filter(x => x.getType == Type.BYTES || x.getType == Type.FIXED) +if (at.length > 1) { + throw new IncompatibleSchemaException( +s"Cannot resolve schema of ${catalystType} against union ${avroType.toString}") +} else { + at +} + case ShortType | IntegerType => fieldTypes.filter(_.getType == Type.INT) + case LongType => fieldTypes.filter(_.getType == Type.LONG) + case FloatType => fieldTypes.filter(_.getType == Type.FLOAT) + case DoubleType => fieldTypes.filter(_.getType == Type.DOUBLE) + case d: DecimalType => fieldTypes.filter(_.getType == Type.STRING) + case StringType => fieldTypes +.filter(x => x.getType == Type.STRING || x.getType == Type.ENUM) + case DateType => fieldTypes.filter(x => x.getType == Type.INT || x.getType == Type.LONG) + case TimestampType => fieldTypes.filter(_.getType == Type.LONG) + case ArrayType(et, containsNull) => +// Find array that matches the element type specified +fieldTypes.filter(x => x.getType == Type.ARRAY + && typeMatchesSchema(et, x.getElementType)) + case st: StructType => // Find the matching record! +val recordTypes = fieldTypes.filter(x => x.getType == Type.RECORD) +if (recordTypes.length > 1) { + throw new IncompatibleSchemaException( +"Unions of multiple record types are NOT supported with user-specified schema") +} +recordTypes + case MapType(kt, vt, valueContainsNull) => +// Find the map that matches the value type. Maps in Avro are always key type string +fieldTypes.filter(x => x.getType == Type.MAP && typeMatchesSchema(vt, x.getValueType)) + case other => +throw new IncompatibleSchemaException(s"Unexpected type: $other") +} + +assert(actualType.length == 1) +actualType.head + case (false, _) | (true, _) => avroType +} + } + + // Given a Schema and a DataType, do they match? + private def typeMatchesSchema(catalystType: DataType, avroSchema: Schema): Boolean = { +if (catalystType.isInstanceOf[StructType]) { + val avroFields = resolveNullableType(avroSchema, catalystType, +avroSchema.getType == Type.UNION) +.getFields + if (avroFields.size() == catalystType.asIns
[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r205692778 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -165,16 +183,112 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: result } - private def resolveNullableType(avroType: Schema, nullable: Boolean): Schema = { -if (nullable) { - // avro uses union to represent nullable type. - val fields = avroType.getTypes.asScala - assert(fields.length == 2) - val actualType = fields.filter(_.getType != NULL) - assert(actualType.length == 1) - actualType.head + // Resolve an Avro union against a supplied DataType, i.e. a LongType compared against + // a ["null", "long"] should return a schema of type Schema.Type.LONG + // This function also handles resolving a DataType against unions of 2 or more types, i.e. + // an IntType resolves against a ["int", "long", "null"] will correctly return a schema of + // type Schema.Type.LONG + private def resolveNullableType(avroType: Schema, catalystType: DataType, + nullable: Boolean): Schema = { +(nullable, avroType.getType) match { + case (false, Type.UNION) | (true, Type.UNION) => +// avro uses union to represent nullable type. +val fieldTypes = avroType.getTypes.asScala + +// If we're nullable, we need to have at least two types. Cases with more than two types +// are captured in test("read read-write, read-write w/ schema, read") w/ test.avro input +assert(fieldTypes.length >= 2) + +val actualType = catalystType match { + case NullType => fieldTypes.filter(_.getType == Type.NULL) + case BooleanType => fieldTypes.filter(_.getType == Type.BOOLEAN) + case ByteType => fieldTypes.filter(_.getType == Type.INT) + case BinaryType => +val at = fieldTypes.filter(x => x.getType == Type.BYTES || x.getType == Type.FIXED) +if (at.length > 1) { + throw new IncompatibleSchemaException( +s"Cannot resolve schema of ${catalystType} against union ${avroType.toString}") +} else { + at +} + case ShortType | IntegerType => fieldTypes.filter(_.getType == Type.INT) + case LongType => fieldTypes.filter(_.getType == Type.LONG) + case FloatType => fieldTypes.filter(_.getType == Type.FLOAT) + case DoubleType => fieldTypes.filter(_.getType == Type.DOUBLE) + case d: DecimalType => fieldTypes.filter(_.getType == Type.STRING) + case StringType => fieldTypes +.filter(x => x.getType == Type.STRING || x.getType == Type.ENUM) + case DateType => fieldTypes.filter(x => x.getType == Type.INT || x.getType == Type.LONG) + case TimestampType => fieldTypes.filter(_.getType == Type.LONG) + case ArrayType(et, containsNull) => +// Find array that matches the element type specified +fieldTypes.filter(x => x.getType == Type.ARRAY + && typeMatchesSchema(et, x.getElementType)) + case st: StructType => // Find the matching record! +val recordTypes = fieldTypes.filter(x => x.getType == Type.RECORD) +if (recordTypes.length > 1) { + throw new IncompatibleSchemaException( +"Unions of multiple record types are NOT supported with user-specified schema") +} +recordTypes + case MapType(kt, vt, valueContainsNull) => +// Find the map that matches the value type. Maps in Avro are always key type string +fieldTypes.filter(x => x.getType == Type.MAP && typeMatchesSchema(vt, x.getValueType)) + case other => +throw new IncompatibleSchemaException(s"Unexpected type: $other") +} + +assert(actualType.length == 1) --- End diff -- Can you elaborate when `actualType.length == 0` or `actualType.length > 1`? Is it possible that `catalystType` is `Int`, and `fieldTypes` only contains `Long`? Do we want to do the promotion? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r205684257 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -165,16 +183,112 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: result } - private def resolveNullableType(avroType: Schema, nullable: Boolean): Schema = { -if (nullable) { - // avro uses union to represent nullable type. - val fields = avroType.getTypes.asScala - assert(fields.length == 2) - val actualType = fields.filter(_.getType != NULL) - assert(actualType.length == 1) - actualType.head + // Resolve an Avro union against a supplied DataType, i.e. a LongType compared against + // a ["null", "long"] should return a schema of type Schema.Type.LONG + // This function also handles resolving a DataType against unions of 2 or more types, i.e. + // an IntType resolves against a ["int", "long", "null"] will correctly return a schema of + // type Schema.Type.LONG + private def resolveNullableType(avroType: Schema, catalystType: DataType, + nullable: Boolean): Schema = { +(nullable, avroType.getType) match { --- End diff -- Since the code is complicated and long, maybe it's easier to read with just old fashion `if-else`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r205685728 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -165,16 +183,112 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: result } - private def resolveNullableType(avroType: Schema, nullable: Boolean): Schema = { -if (nullable) { - // avro uses union to represent nullable type. - val fields = avroType.getTypes.asScala - assert(fields.length == 2) - val actualType = fields.filter(_.getType != NULL) - assert(actualType.length == 1) - actualType.head + // Resolve an Avro union against a supplied DataType, i.e. a LongType compared against + // a ["null", "long"] should return a schema of type Schema.Type.LONG + // This function also handles resolving a DataType against unions of 2 or more types, i.e. + // an IntType resolves against a ["int", "long", "null"] will correctly return a schema of + // type Schema.Type.LONG + private def resolveNullableType(avroType: Schema, catalystType: DataType, + nullable: Boolean): Schema = { +(nullable, avroType.getType) match { + case (false, Type.UNION) | (true, Type.UNION) => +// avro uses union to represent nullable type. +val fieldTypes = avroType.getTypes.asScala + +// If we're nullable, we need to have at least two types. Cases with more than two types +// are captured in test("read read-write, read-write w/ schema, read") w/ test.avro input +assert(fieldTypes.length >= 2) --- End diff -- When it's non-nullable, is it possible to have `fieldTypes.length == 1`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r205683257 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -148,7 +165,8 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: val avroFields = avroStruct.getFields assert(avroFields.size() == catalystStruct.length) val fieldConverters = catalystStruct.zip(avroFields.asScala).map { - case (f1, f2) => newConverter(f1.dataType, resolveNullableType(f2.schema(), f1.nullable)) + case (f1, f2) => newConverter(f1.dataType, resolveNullableType( +f2.schema(), f1.dataType, f1.nullable)) --- End diff -- Nit, formating, ```scala case (f1, f2) => newConverter(f1.dataType, resolveNullableType(f2.schema(), f1.dataType, f1.nullable)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO suppo...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21847#discussion_r205648911 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala --- @@ -87,17 +88,33 @@ class AvroSerializer(rootCatalystType: DataType, rootAvroType: Schema, nullable: case d: DecimalType => (getter, ordinal) => getter.getDecimal(ordinal, d.precision, d.scale).toString case StringType => -(getter, ordinal) => new Utf8(getter.getUTF8String(ordinal).getBytes) +(getter, ordinal) => + if (avroType.getType == Type.ENUM) { +new GenericData.EnumSymbol(avroType, getter.getUTF8String(ordinal).toString) + } else { +new Utf8(getter.getUTF8String(ordinal).getBytes) + } case BinaryType => -(getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) +(getter, ordinal) => + val data = getter.getBinary(ordinal) + if (avroType.getType == Type.FIXED) { +// Handles fixed-type fields in output schema. Test case is included in test.avro +// as it includes several fixed fields that would fail if we specify schema +// on-write without this condition +val fixed = new GenericData.Fixed(avroType) +fixed.bytes(data) +fixed + } else { +ByteBuffer.wrap(data) + } --- End diff -- This might be slow. In the executors, when each row is going to be serialized, the whole `if-else` will be executed again and agin to get a specialized converter. We can consider to resolve the specialized types earlier in driver by ```scala import org.apache.avro.generic.GenericData.{Fixed, EnumSymbol} ... case StringType => if (avroType.getType == Type.ENUM) { (getter, ordinal) => new EnumSymbol(avroType, getter.getUTF8String(ordinal).toString) } else { (getter, ordinal) => new Utf8(getter.getUTF8String(ordinal).getBytes) } case BinaryType => if (avroType.getType == Type.FIXED) { (getter, ordinal) => new Fixed(avroType, getter.getBinary(ordinal)) } else { (getter, ordinal) => ByteBuffer.wrap(getter.getBinary(ordinal)) } ``` so the returned lambda expression will not have any check on `FIXED` or `ENUM` types. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21852#discussion_r205599224 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -416,6 +416,29 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, Some(elseValue)) if { +val list = branches.map(_._2) :+ elseValue +list.tail.forall(list.head.semanticEquals) + } => +// For non-deterministic conditions with side effect, we can not remove it, or change +// the ordering. As a result, we try to remove the deterministic conditions from the tail. +val newBranches = branches.map(_._1) --- End diff -- @viirya I think this can address your concern. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205556780 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,9 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(Seq((cond, trueValue)), elseValue) => +If(cond, trueValue, elseValue.getOrElse(Literal(null, trueValue.dataType))) --- End diff -- The generated Java code is slightly simpler, but I agree there should not have any performance gain. Being said that, once `CaseWhen` is converted into `If`, this condition expression will be benefited from the optimization rules in `If` which may not be implemented for `CaseWhen` case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21847: [SPARK-24855][SQL][EXTERNAL]: Built-in AVRO support shou...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21847 +cc @MaxGekk and @gengliangwang who worked on this part of codebase. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21852#discussion_r205306098 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -416,6 +416,22 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case e @ CaseWhen(branches, Some(elseValue)) if { +// With previous rules, it's guaranteed that there must be one branch. --- End diff -- You're right. I removed the comment. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21852#discussion_r205305691 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimplifyConditionalSuite.scala --- @@ -122,4 +126,25 @@ class SimplifyConditionalSuite extends PlanTest with PredicateHelper { None), CaseWhen(normalBranch :: trueBranch :: Nil, None)) } + + test("remove entire CaseWhen if all the outputs are semantic equivalence") { --- End diff -- Yes, I plan to add couple more tests tonight. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21850 @gatorsmile All the new rules added into `If` should always have `CaseWhen` version. But there will be time that we only add `If` version, or it only makes sense to have `If` version. For example, the new rule that short-circuiting the `If` when both branches are the same is not yet in `CaseWhen`. Another rule I am working on is when a conditional expression in `Filter` or `Join`, if any of the output of the branches contains `Literal(null, _)`, it can be replaced by `FalseLiteral`. I only implemented for `If` so far for our use-case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r205187664 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,16 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(branches, elseValue) if branches.length == 1 => +// Using pattern matching like `CaseWhen((cond, branchValue) :: Nil, elseValue)` will not +// work since the implementation of `branches` can be `ArrayBuffer`. A full test is in --- End diff -- @ueshin thanks! The code is much more cleaner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21850 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r204953356 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,16 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(branches, elseValue) if branches.length == 1 => +// Using pattern matching like `CaseWhen((cond, branchValue) :: Nil, elseValue)` will not +// work since the implementation of `branches` can be `ArrayBuffer`. A full test is in --- End diff -- We can change it to immutable `List` to avoid confusion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r204953202 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,16 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(branches, elseValue) if branches.length == 1 => +// Using pattern matching like `CaseWhen((cond, branchValue) :: Nil, elseValue)` will not +// work since the implementation of `branches` can be `ArrayBuffer`. A full test is in --- End diff -- `sql("select case when a is null then 1 end col1 from t")` will create `branches` with `ArrayBuffer` implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21848: [SPARK-24890] [SQL] Short circuiting the `if` condition ...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21848 Here is a followup PR for making `AssertTrue` and `AssertNotNull` `non-deterministic` https://issues.apache.org/jira/browse/SPARK-24913 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21864: [SPARK-24908][R][style] removing spaces to make lintr ha...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21864 LGTM. Merged into master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21848: [SPARK-24890] [SQL] Short circuiting the `if` condition ...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21848 @kiszk `trait Stateful extends Nondeterministic`, and this rule will not be invoked when an expression is nondeterministic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21848: [SPARK-24890] [SQL] Short circuiting the `if` con...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21848#discussion_r204938763 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1627,6 +1627,8 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp case class AssertNotNull(child: Expression, walkedTypePath: Seq[String] = Nil) extends UnaryExpression with NonSQLExpression { + override lazy val deterministic: Boolean = false --- End diff -- Fair. I'll create a followup PR for this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r204933164 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,9 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen((cond, branchValue) :: Nil, elseValue) => +If(cond, branchValue, elseValue.getOrElse(Literal(null, branchValue.dataType))) --- End diff -- Before: ``` == Parsed Logical Plan == 'Project [CASE WHEN isnull('a) THEN 1 END AS col1#181] +- 'UnresolvedRelation == Optimized Logical Plan == Project [CASE WHEN isnull(a#182) THEN 1 END AS col1#181] +- Relation[a#182] parquet ``` Generated Java code ```java /* 043 */ protected void processNext() throws java.io.IOException { /* 044 */ if (scan_mutableStateArray_1[0] == null) { /* 045 */ scan_nextBatch_0(); /* 046 */ } /* 047 */ while (scan_mutableStateArray_1[0] != null) { /* 048 */ int scan_numRows_0 = scan_mutableStateArray_1[0].numRows(); /* 049 */ int scan_localEnd_0 = scan_numRows_0 - scan_batchIdx_0; /* 050 */ for (int scan_localIdx_0 = 0; scan_localIdx_0 < scan_localEnd_0; scan_localIdx_0++) { /* 051 */ int scan_rowIdx_0 = scan_batchIdx_0 + scan_localIdx_0; /* 052 */ byte project_caseWhenResultState_0 = -1; /* 053 */ do { /* 054 */ boolean scan_isNull_0 = scan_mutableStateArray_2[0].isNullAt(scan_rowIdx_0); /* 055 */ int scan_value_0 = scan_isNull_0 ? -1 : (scan_mutableStateArray_2[0].getInt(scan_rowIdx_0)); /* 056 */ if (!false && scan_isNull_0) { /* 057 */ project_caseWhenResultState_0 = (byte)(false ? 1 : 0); /* 058 */ project_project_value_0_0 = 1; /* 059 */ continue; /* 060 */ } /* 061 */ /* 062 */ } while (false); /* 063 */ // TRUE if any condition is met and the result is null, or no any condition is met. /* 064 */ final boolean project_isNull_0 = (project_caseWhenResultState_0 != 0); /* 065 */ scan_mutableStateArray_3[1].reset(); /* 066 */ /* 067 */ scan_mutableStateArray_3[1].zeroOutNullBytes(); /* 068 */ /* 069 */ if (project_isNull_0) { /* 070 */ scan_mutableStateArray_3[1].setNullAt(0); /* 071 */ } else { /* 072 */ scan_mutableStateArray_3[1].write(0, project_project_value_0_0); /* 073 */ } /* 074 */ append((scan_mutableStateArray_3[1].getRow())); /* 075 */ if (shouldStop()) { scan_batchIdx_0 = scan_rowIdx_0 + 1; return; } /* 076 */ } /* 077 */ scan_batchIdx_0 = scan_numRows_0; /* 078 */ scan_mutableStateArray_1[0] = null; /* 079 */ scan_nextBatch_0(); /* 080 */ } /* 081 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* scanTime */).add(scan_scanTime_0 / (1000 * 1000)); /* 082 */ scan_scanTime_0 = 0; /* 083 */ } ``` After: ``` == Parsed Logical Plan == 'Project [CASE WHEN isnull('a) THEN 1 END AS b#186] +- 'UnresolvedRelation `td` == Optimized Logical Plan == Project [if (isnull(a#187)) 1 else null AS b#186] +- Relation[a#187,b#188] parquet ``` Generated Java code: ```java /* 042 */ protected void processNext() throws java.io.IOException { /* 043 */ if (scan_mutableStateArray_1[0] == null) { /* 044 */ scan_nextBatch_0(); /* 045 */ } /* 046 */ while (scan_mutableStateArray_1[0] != null) { /* 047 */ int scan_numRows_0 = scan_mutableStateArray_1[0].numRows(); /* 048 */ int scan_localEnd_0 = scan_numRows_0 - scan_batchIdx_0; /* 049 */ for (int scan_localIdx_0 = 0; scan_localIdx_0 < scan_localEnd_0; scan_localIdx_0++) { /* 050 */ int scan_rowIdx_0 = scan_batchIdx_0 + scan_localIdx_0; /* 051 */ boolean scan_isNull_0 = scan_mutableStateArray_2[0].isNullAt(scan_rowIdx_0); /* 052 */ int scan_value_0 = scan_isNull_0 ? -1 : (scan_mutableStateArray_2[0].getInt(scan_rowIdx_0)); /* 053 */ boolean project_isNull_0 = false; /* 054 */ int project_value_0 = -1; /* 055 */ if (!false && scan_isNull_0) { /* 056 */ project_isNull_0 = false; /* 057 */ project_value_0 = 1; /* 058 */ } else { /* 059 */ project_isNull_0 = true; /* 060 */ project_value_0 = -1; /* 061 */ } /* 062 */ s
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r204933531 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,9 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen((cond, branchValue) :: Nil, elseValue) => +If(cond, branchValue, elseValue.getOrElse(Literal(null, branchValue.dataType))) --- End diff -- Look like not much difference in term of performance, but `If` primitive has more opportunities for further optimization. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21850 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21848: [SPARK-24890] [SQL] Short circuiting the `if` condition ...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21848 This will simply the scope of this PR a lot by just having both `AssertTrue` and `AssertNotNull` as `non-deterministic` expression. My concern is the more `non-deterministic` expressions we have, the less optimization we can do. Luckily, both of them are not used in general expressions. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21850 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when the...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21850 @cloud-fan and @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21850#discussion_r204560250 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -414,6 +414,12 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { // these branches can be pruned away val (h, t) = branches.span(_._1 != TrueLiteral) CaseWhen( h :+ t.head, None) + + case CaseWhen(branches, elseValue) if branches.length == 1 => +val cond = branches.head._1 +val trueValue = branches.head._2 +val falseValue = elseValue.getOrElse(Literal(null, trueValue.dataType)) +If(cond, trueValue, falseValue) --- End diff -- Done. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21848: [SPARK-24890] [SQL] Short circuiting the `if` condition ...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21848 @gatorsmile this can remove some of the expensive condition expressions, so I would like to find a way to properly implement this. Thank you all for chiming in with many good points. Let me summary here. 1. The cond expression can only be removed when it doesn't have a side effect. 2. Stateful expression must have a side effect. 3. Some of the non-stateful expressions such as AssertTrue have a side effect. 4. Determinstic expression could have a side effect. 5. Nondeterministic expressions always have a side effect. This means `determinstic` is not enough, and we need another variable to check if an expression has a side effect. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21848: [SPARK-24890] [SQL] Short circuiting the `if` con...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21848#discussion_r204546087 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -390,6 +390,7 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { case If(TrueLiteral, trueValue, _) => trueValue case If(FalseLiteral, _, falseValue) => falseValue case If(Literal(null, _), _, falseValue) => falseValue + case If(_, trueValue, falseValue) if trueValue.semanticEquals(falseValue) => trueValue --- End diff -- Good point. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21852: [SPARK-24893] [SQL] Remove the entire CaseWhen if...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21852 [SPARK-24893] [SQL] Remove the entire CaseWhen if all the outputs are semantic equivalence ## What changes were proposed in this pull request? Similar to SPARK-24890, if all the outputs of `CaseWhen` are semantic equivalence, `CaseWhen` can be removed. ## How was this patch tested? Tests added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark short-circuit-when Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21852.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21852 commit dc8de5fec6efc723ce01bcbaf0496c57b62e0ba2 Author: DB Tsai Date: 2018-07-23T19:15:58Z remove casewhen if possible --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21848: [SPARK-24890] [SQL] Short circuiting the `if` con...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21848#discussion_r204516658 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -390,6 +390,7 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { case If(TrueLiteral, trueValue, _) => trueValue case If(FalseLiteral, _, falseValue) => falseValue case If(Literal(null, _), _, falseValue) => falseValue + case If(_, trueValue, falseValue) if trueValue.semanticEquals(falseValue) => trueValue --- End diff -- Understandable that the condition can be non-deterministic, but this doesn't change the result of `If`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21848: [SPARK-24890] [SQL] Short circuiting the `if` con...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21848#discussion_r204515560 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -390,6 +390,7 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { case If(TrueLiteral, trueValue, _) => trueValue case If(FalseLiteral, _, falseValue) => falseValue case If(Literal(null, _), _, falseValue) => falseValue + case If(_, trueValue, falseValue) if trueValue.semanticEquals(falseValue) => trueValue --- End diff -- Can you elaborate? For `trueValue.semanticEquals(falseValue)`, it's guaranteed that both `trueValue` and `falseValue` are `deterministic`. ```scala def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21850: [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` w...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21850 [SPARK-24892] [SQL] Simplify `CaseWhen` to `If` when there is only one branch ## What changes were proposed in this pull request? After the rule of removing the unreachable branches, it could be only one branch left. In this situation, `CaseWhen` can be converted to `If` to do further optimization. ## How was this patch tested? Tests added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark remove-case-when Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21850.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21850 commit 18e2d8da63c05fc931a4b569bd6c404b933eeb0a Author: DB Tsai Date: 2018-07-23T18:31:40Z remove casewhen --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21848: [SPARK-24890] [SQL] Short circuiting the `if` con...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21848#discussion_r204509378 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -403,14 +404,14 @@ object SimplifyConditionals extends Rule[LogicalPlan] with PredicateHelper { e.copy(branches = newBranches) } - case e @ CaseWhen(branches, _) if branches.headOption.map(_._1) == Some(TrueLiteral) => + case CaseWhen(branches, _) if branches.headOption.map(_._1).contains(TrueLiteral) => --- End diff -- Since it's not a bug fix, I guess it's unlikely someone will backport this :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21848: [SPARK-24890] [SQL] Short circuiting the `if` con...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21848#discussion_r204508986 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -651,6 +652,7 @@ object SimplifyCaseConversionExpressions extends Rule[LogicalPlan] { } } + --- End diff -- I thought we always have two new lines between two objects --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21848: [SPARK-24890] [SQ] Short circuiting the `if` cond...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21848 [SPARK-24890] [SQ] Short circuiting the `if` condition when `trueValue` and `falseValue` are the same ## What changes were proposed in this pull request? When `trueValue` and `falseValue` are semantic equivalence, the condition expression in `if` can be removed to avoid extra computation in runtime. ## How was this patch tested? Test added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark short-circuit-if Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21848.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21848 commit 30ca8cf8aad2b3c030902bd00c4e8b0dd9acc430 Author: DB Tsai Date: 2018-07-23T17:22:14Z Add if short circuit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21847: [SPARK-24855][SQL][EXTERNAL][WIP]: Built-in AVRO support...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21847 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21797: [SPARK-24402] [SQL] Optimize `In` expression when only o...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21797 Merged into master as it passes the build now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21797: [SPARK-24402] [SQL] Optimize `In` expression when only o...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21797 Test it again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21797: [SPARK-24402] [SQL] Optimize `In` expression when...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21797#discussion_r203169950 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -218,15 +218,24 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.isEmpty => +// When v is not nullable, the following expression will be optimized +// to FalseLiteral which is tested in OptimizeInSuite.scala +If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq -if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) { +if (newList.length == 1 + // TODO: `EqualTo` for structural types are not working. Until SPARK-24443 is addressed, + // TODO: we exclude them in this rule. + && !v.isInstanceOf[CreateNamedStructLike] + && !newList.head.isInstanceOf[CreateNamedStructLike]) { --- End diff -- @cloud-fan @gatorsmile until https://github.com/apache/spark/pull/21470 is merged, let's exclude `CreateNamedStructLike` in this rule. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21442: [SPARK-24402] [SQL] Optimize `In` expression when only o...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21442 I opened a new PR at https://github.com/apache/spark/pull/21797/files Will work on the test issue there. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21797: [SPARK-24402] [SQL] Optimize `In` expression when...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21797 [SPARK-24402] [SQL] Optimize `In` expression when only one element in the collection or collection is empty ## What changes were proposed in this pull request? Two new rules in the logical plan optimizers are added. 1. When there is only one element in the **`Collection`**, the physical plan will be optimized to **`EqualTo`**, so predicate pushdown can be used. ```scala profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true) """ |== Physical Plan == |*(1) Project [profileID#0] |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6)) | +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet, | PartitionFilters: [], | PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)], | ReadSchema: struct """.stripMargin ``` 2. When the **`Collection`** is empty, and the input is nullable, the logical plan will be simplified to ```scala profileDF.filter( $"profileID".isInCollection(Set())).explain(true) """ |== Optimized Logical Plan == |Filter if (isnull(profileID#0)) null else false |+- Relation[profileID#0] parquet """.stripMargin ``` TODO: 1. For multiple conditions with numbers less than certain thresholds, we should still allow predicate pushdown. 2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when the numbers of the categories are low, and they are **`Int`**, **`Long`**. 3. The default immutable hash trees set is slow for query, and we should do benchmark for using different set implementation for faster query. 4. **`filter(if (condition) null else false)`** can be optimized to false. ## How was this patch tested? Couple new tests are added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark optimize-in Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21797.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21797 commit 7c44c70fe664e73b36a49a974ece93a0c83d7f8e Author: DB Tsai Date: 2018-05-28T07:27:09Z Optimize `In` commit 449613aab8e631582e5a152a9a4b67a8d2468738 Author: DB Tsai Date: 2018-05-28T07:31:13Z style commit 7a354fcd154ec2d8f88a5c1fbf1bd75fdb15ec49 Author: DB Tsai Date: 2018-05-29T07:09:53Z Addressed feedback commit fa678f8f69aac246cb3622b70bde67f649f17b93 Author: DB Tsai Date: 2018-07-16T18:00:41Z Merge branch 'master' into optimize-in commit 61824f78622092370259b2ba910c0a4b0f29235d Author: DB Tsai Date: 2018-07-16T18:01:17Z Merge branch 'master' into optimize-in commit 23fedd8d65cb51201e1f032c938671ebc21eb432 Author: DB Tsai Date: 2018-07-16T18:47:20Z Addressed feedback commit 5079833cc25949c806575f365f62f423a3205282 Author: DB Tsai Date: 2018-07-16T18:57:07Z update commit c519c4096690c2f8aa26a853a038218f3aaa100a Author: DB Tsai Date: 2018-07-16T20:37:55Z Add one more test commit 9174a30b092740284250002f5a5fee50eadfc754 Author: DB Tsai Date: 2018-07-16T20:42:12Z Remove duplicated code commit 1a0cd0b6db21220c8a809573fe591be057c164de Author: DB Tsai Date: 2018-07-17T17:36:57Z Merge branch 'master' into optimize-in --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21442: [SPARK-24402] [SQL] Optimize `In` expression when only o...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21442 @HyukjinKwon thanks for bringing this to my attention. @gatorsmile I thought the bug is found by this PR, and not in this PR. This PR is blocked until SPARK-24443 is addressed. I'll unblocck this PR by turning `In` to `EqualTo` if the `list` is not a `ListQuery` suggested by @cloud-fan . Thanks all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21756: [SPARK-24764] [CORE] Add ServiceLoader implementation fo...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21756 Jenkins, ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21756: [SPARK-24764] [CORE] Add ServiceLoader implementation fo...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21756 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21749: [SPARK-24785] [SHELL] Making sure REPL prints Spa...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21749#discussion_r201793790 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala --- @@ -116,6 +124,132 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) super.replay() } + /** + * The following code is mostly a copy of `process` implementation in `ILoop.scala` in Scala + * + * In newer version of Scala, `printWelcome` is the first thing to be called. As a result, + * SparkUI URL information would be always shown after the welcome message. + * + * However, this is inconsistent compared with the existing version of Spark which will always + * show SparkUI URL first. + * + * The only way we can make it consistent will be duplicating the Scala code. + * + * We should remove this duplication once Scala provides a way to load our custom initialization + * code, and also customize the ordering of printing welcome message. + */ + override def process(settings: Settings): Boolean = savingContextLoader { + +def newReader = in0.fold(chooseReader(settings))(r => SimpleReader(r, out, interactive = true)) + +/** Reader to use before interpreter is online. */ +def preLoop = { + val sr = SplashReader(newReader) { r => +in = r +in.postInit() + } + in = sr + SplashLoop(sr, prompt) +} + +/* Actions to cram in parallel while collecting first user input at prompt. + * Run with output muted both from ILoop and from the intp reporter. + */ +def loopPostInit(): Unit = mumly { + // Bind intp somewhere out of the regular namespace where + // we can get at it in generated code. + intp.quietBind(NamedParam[IMain]("$intp", intp)(tagOfIMain, classTag[IMain])) + + // Auto-run code via some setting. + ( replProps.replAutorunCode.option +flatMap (f => File(f).safeSlurp()) +foreach (intp quietRun _) +) + // power mode setup + if (isReplPower) enablePowerMode(true) + initializeSpark() + loadInitFiles() + // SI-7418 Now, and only now, can we enable TAB completion. + in.postInit() +} +def loadInitFiles(): Unit = settings match { + case settings: GenericRunnerSettings => +for (f <- settings.loadfiles.value) { + loadCommand(f) + addReplay(s":load $f") +} +for (f <- settings.pastefiles.value) { + pasteCommand(f) + addReplay(s":paste $f") +} + case _ => +} +// wait until after startup to enable noisy settings +def withSuppressedSettings[A](body: => A): A = { + val ss = this.settings + import ss._ + val noisy = List(Xprint, Ytyperdebug) + val noisesome = noisy.exists(!_.isDefault) + val current = (Xprint.value, Ytyperdebug.value) + if (isReplDebug || !noisesome) body + else { +this.settings.Xprint.value = List.empty +this.settings.Ytyperdebug.value = false +try body +finally { + Xprint.value = current._1 + Ytyperdebug.value = current._2 + intp.global.printTypings = current._2 +} + } +} +def startup(): String = withSuppressedSettings { + // let them start typing + val splash = preLoop + + // while we go fire up the REPL + try { +// don't allow ancient sbt to hijack the reader +savingReader { + createInterpreter() +} +intp.initializeSynchronous() + +val field = classOf[ILoop].getDeclaredFields.filter(_.getName.contains("globalFuture")).head +field.setAccessible(true) +field.set(this, Future successful true) --- End diff -- This reflection has to be used to access private `globalFuture` in `ILoop`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21495 JIRA and PR are created to make sure the messages are printed in the right order. https://issues.apache.org/jira/browse/SPARK-24785 https://github.com/apache/spark/pull/21749 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21749: [SPARK-24785] [SHELL] Making sure REPL prints Spa...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21749 [SPARK-24785] [SHELL] Making sure REPL prints Spark UI info and then Welcome message ## What changes were proposed in this pull request? After https://github.com/apache/spark/pull/21495 the welcome message is printed first, and then Scala prompt will be shown before the Spark UI info is printed. Although it's a minor issue, but visually, it doesn't look as nice as the existing behavior. This PR intends to fix it by duplicating the Scala `process` code to arrange the printing order. However, one variable is private, so reflection has to be used which is not desirable. We can use this PR to brainstorm how to handle it properly and how Scala can change their APIs to fit our need. ## How was this patch tested? Existing test You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark repl-followup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21749.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21749 commit 34ac2897bf1320aad24f7d1a87812335db1e13ca Author: DB Tsai Date: 2018-05-29T18:03:40Z Upgrade scala to 2.11.12 and 2.12.6 Update LICENSE Update JLine update scala-parser-combinators Error handling Fix compilation temp Revert hack remove hack commit d7fc65c1973c33b4cbc0f8a8429cd64024d17ad5 Author: DB Tsai Date: 2018-06-28T22:45:25Z Merge branch 'master' into scala-2.11.12 commit 45c7c5b96eeb47c253f1551bfc780e4b321e25c5 Author: DB Tsai Date: 2018-06-28T22:46:53Z temp commit e7d02a903438ec5dc3a6151402d1033bcf45c746 Author: DB Tsai Date: 2018-06-29T00:16:49Z update commit 3f71d20e5ecaacd5dec32beea2da580927697e81 Author: DB Tsai Date: 2018-07-10T23:01:55Z temp commit fdb084ac38d7b3b2fdccc41fc5854fba003d9015 Author: DB Tsai Date: 2018-07-10T23:02:36Z Merge branch 'master' into repl-followup commit b1e28e82f0ad9b78e0d03f6cb0f27d7b097f5915 Author: DB Tsai Date: 2018-07-11T01:14:36Z working --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 Thanks. Merged into master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 There are three approvals from the committers, and the changes are pretty trivial to revert if we see any performance regression which is unlikely. To move thing forward, if there is no further objection, I'll merge it tomorrow. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21692: [SPARK-24715][Build] Override jline version as 2.14.3 in...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21692 @viirya thanks for this PR. I thought SBT always uses pom for dependencies, and I wonder why there is a discrepancy so we need to manually override it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21495 I was on a family leave for couple weeks. Thank you all for helping out and merging it. The only change with this PR is that the welcome message will be printed first, and then the Spark URL will be shown latter. It's a minor difference. I had an offline discussion with @adriaanm in Scala community. To overcome this issue, he suggested we can override the entire `def process(settings: Settings): Boolean` to put our initialization code. I have a working implementation of this, but this will copy a lot of code from Scala to just get the printing order right. If we decide to have a consistent printing order, I can submit a separate PR for this. He is open to working with us to add proper hook in Scala so we don't need use hacks to initialize our code. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21495 retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21495 I decided to remove the hack I put to get the Spark UI consistent since this hack will bring into more problems. @som-snytt Is it possible to move the `printWelcome` and `splash.start()` to some place after `initializeSynchronous()` is called? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21519: [SPARK-24412] [SQL] Adding docs about automagical type c...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21519 Merged into master as it's only document changes. Congratulations on first PR! Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21519: [SPARK-24412] [SQL] Adding docs about automagical type c...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21519 add to whitelist --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r194176806 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => --- End diff -- I thought for `extends`, it's four spaces? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r193981694 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => + + /** + * We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized + * and *before* the REPL sees any files in the private `loadInitFiles` functions, so that + * the Spark context is visible in those files. + * + * This is a bit of a hack, but there isn't another hook available to us at this point. + * + * See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail. + */ + override def initializeSynchronous(): Unit = { +super.initializeSynchronous() +initializeSpark() --- End diff -- I don't know why it is working for me, and maybe my local deps get mess up :( I just updated the jline, and please try and see if it works. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21495 @som-snytt initialize it in `printWelcome` will not work since in order version of Scala, `printWelcome` is the last one to be executed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r193927042 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => + + /** + * We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized + * and *before* the REPL sees any files in the private `loadInitFiles` functions, so that + * the Spark context is visible in those files. + * + * This is a bit of a hack, but there isn't another hook available to us at this point. + * + * See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail. + */ + override def initializeSynchronous(): Unit = { +super.initializeSynchronous() +initializeSpark() --- End diff -- It is working for me. I got `scala-compiler-2.11.12.jar` in my classpath. Can you do a clean build? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21495 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21453: Test branch to see how Scala 2.11.12 performs
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21453 I opened a PR for Scala 2.11.12 with Scala API change fix. https://github.com/apache/spark/pull/21495 Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21495#discussion_r192961905 --- Diff: repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoopInterpreter.scala --- @@ -21,8 +21,22 @@ import scala.collection.mutable import scala.tools.nsc.Settings import scala.tools.nsc.interpreter._ -class SparkILoopInterpreter(settings: Settings, out: JPrintWriter) extends IMain(settings, out) { - self => +class SparkILoopInterpreter(settings: Settings, out: JPrintWriter, initializeSpark: () => Unit) +extends IMain(settings, out) { self => + + /** + * We override `initializeSynchronous` to initialize Spark *after* `intp` is properly initialized + * and *before* the REPL sees any files in the private `loadInitFiles` functions, so that + * the Spark context is visible in those files. + * + * This is a bit of a hack, but there isn't another hook available to us at this point. + * + * See the discussion in Scala community https://github.com/scala/bug/issues/10913 for detail. + */ + override def initializeSynchronous(): Unit = { +super.initializeSynchronous() +initializeSpark() --- End diff -- @som-snytt It's working, but I'm wondering if I'm doing it correctly. In this case, I'll use `$intp` without checking ```scala if (intp.reporter.hasErrors) { echo("Interpreter encountered errors during initialization!") null } ``` in `iLoop.scala`. And `intp.quietBind(NamedParam[IMain]("$intp", intp)(tagOfIMain, classTag[IMain]))` will not be executed before our custom Spark initialization code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21495: [SPARK-24418][Build] Upgrade Scala to 2.11.12 and...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21495 [SPARK-24418][Build] Upgrade Scala to 2.11.12 and 2.12.6 ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark scala-2.11.12 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21495.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21495 commit de790fd251ba3727bba23ceb1ca07559d25b7e87 Author: DB Tsai Date: 2018-05-29T18:03:40Z Upgrade scala to 2.11.12 and 2.12.6 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21453: Test branch to see how Scala 2.11.12 performs
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21453 I filed an issue in Scala community about the interface changes, and they said those REPL apis are intended to be private. https://github.com/scala/bug/issues/10913 Being said that, they gave us couple ways to walk around it, and I'm testing it now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 @rxin Mostly for Java 9+. [ASM 6.x](https://mvnrepository.com/artifact/org.ow2.asm/asm/6.0/usages) has been proven in many projects such as FB Presto, Google Guice Core Library, CGLIB, Gradle, and Jetty Servlet Annotations, naming a few. Since it's a self-contained change and has couple bug fixes, how about we lay a good foundation in Spark 2.4 now? If it causes any instability or performance regression, we can easily revert it given we still have some time for 2.4 release. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 @srowen From the release note, http://asm.ow2.io/versions.html , the differences between 5.2 to 6.0 are > Codebase migrated to gitlab (feature requests 317617, 317619, 317542) Support for modules (JSR 376, feature request 317628) Support for JDK specific module attributes (ModuleTarget, ModuleHashes and ModuleResolution). ASM jars now contain debug info, asm-all jar removed (feature request 317622) Support for removable features deleted (cf SIGNATURES, ANNOTATIONS, etc constants in ClassReader) bug fixes 317793: AnalyzerAdapter calculates wrong type for AALOAD in case of null 317784: NegativeArraySizeException when resizing jump instruction in a class without frames 317780: IllegalArgumentException at CheckMethodAdapter.checkInternalName for non-Java-identifier name 317785: NullPointerException in MethodRemapper with 'chop' stack map frames and tree API 317786: VerifyError when added code pushes the offset of a conditional backward jump below -32768 317791: Invalid class files generated for large methods (regression introduced in 5.2) 317792: Fix the handling of inner classes with EXPAND_ASM_INSNS Bug fix related to stack map frames and the support of JSR 376 is what we need. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to suppor...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21459#discussion_r191846953 --- Diff: pom.xml --- @@ -313,13 +313,13 @@ chill-java ${chill.version} -
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 cc @srowen and @rxin for more eyes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 @HyukjinKwon Once Spark can be built against JDK9+, we'll need to figure out how we want to set it up in Jenkins. We can do two builds, and one for JDK8 and another one for JDK9+ for each PR, but this can be really cost. Alternatively, we can do a daily JDK9+ build, and fix some issues accordingly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala ...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21458 Merged into master. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 @felixcheung Yes. All tests are passed with JDK8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to suppor...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21459 [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+ ## What changes were proposed in this pull request? Upgrade ASM to 6.1 to support JDK9+ ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark asm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21459.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21459 commit bec3e81a3522b54692150584c86d1925799c08da Author: DB Tsai Date: 2018-05-29T22:35:55Z Asm6.1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21458 [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+ ## What changes were proposed in this pull request? Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+ ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark sbt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21458.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21458 commit 48c57e09dda63259230aa81facb31c1795f602fe Author: DB Tsai Date: 2018-05-29T23:54:22Z upgrade sbt --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21453: Test branch to see how Scala 2.11.12 performs
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21453 Here is the issue in Scala side. https://github.com/scala/bug/issues/10913 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21453: Test branch to see how Scala 2.11.12 performs
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21453 I'm also looking at this issue. The challenge is that one of the hacks we use to initialize the Spark before REPL sees any files was removed in Scala 2.11.12. https://github.com/apache/spark/blob/master/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala#L109 We might need to work with Scala team to upgrade our Scala version. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isInCollection in DataFrame AP...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21416 Merged into master. Thank you everyone for reviewing. Followup PR will be created for 1. Adding tests in Java. 2. Adding docs about automagical type casting. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isInCollection in DataF...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r191506417 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Column.scala --- @@ -786,6 +787,24 @@ class Column(val expr: Expression) extends Logging { @scala.annotation.varargs def isin(list: Any*): Column = withExpr { In(expr, list.map(lit(_).expr)) } --- End diff -- +1 Let's do it in the followup PR. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isInCollection in DataF...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r191505830 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -390,11 +394,67 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { checkAnswer(df.filter($"b".isin("z", "y")), df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "y")) +// Auto casting should work with mixture of different types in collections +checkAnswer(df.filter($"a".isin(1.toShort, "2")), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isin("3", 2.toLong)), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isin(3, "1")), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") -intercept[AnalysisException] { +val e = intercept[AnalysisException] { df2.filter($"a".isin($"b")) } +Seq("cannot resolve", "due to data type mismatch: Arguments must be same type but were") + .foreach { s => + assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) + } + } + + test("isInCollection: Scala Collection") { +val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") +// Test with different types of collections +checkAnswer(df.filter($"a".isInCollection(Seq(3, 1))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) +checkAnswer(df.filter($"a".isInCollection(Seq(1, 2).toSet)), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq(3, 2).toArray)), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq(3, 1).toList)), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") + +val e = intercept[AnalysisException] { + df2.filter($"a".isInCollection(Seq($"b"))) +} +Seq("cannot resolve", "due to data type mismatch: Arguments must be same type but were") + .foreach { s => + assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) + } + } + + test("isInCollection: Java Collection") { --- End diff -- I totally agree with you that we should have tests natively in Java instead of converting the types to Java in Scala and hope the best that it will work in Java. Let's do it in the followup PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isInCollection in DataFrame AP...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21416 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21442: [SPARK-24402] [SQL] Optimize `In` expression when only o...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21442 @maropu I didn't do a full performance benchmark. I believe the performance gain can be from predicate pushdown when only one element in the set. This can be a lot. I forgot which one, I remember in impala or drill, they allow multiple predicates to be pushed down, and I believe this can be a win in many cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21442: [SPARK-24402] [SQL] Optimize `In` expression when...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21442#discussion_r191320828 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -219,7 +219,14 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { - case In(v, list) if list.isEmpty && !v.nullable => FalseLiteral + case In(v, list) if list.isEmpty => +// When v is not nullable, the following expression will be optimized +// to FalseLiteral which is tested in OptimizeInSuite.scala +If(IsNotNull(v), FalseLiteral, Literal(null, BooleanType)) + case In(v, Seq(elem @ Literal(_, _))) => --- End diff -- As you suggested, I'll move into `case expr @ In(v, list) if expr.inSetConvertible`. Yes, I saw the test failure. The same code was passing the build in my another PR. https://github.com/apache/spark/pull/21416/commits/1332406d7f4ca7a9a4a85338f758430ecc334ff8 I will debug it tomorrow. Thanks, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isInCollection in DataFrame AP...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21416 @rxin I simplified the test cases as you suggested. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isInCollection in DataF...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r191317978 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -392,9 +396,97 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") -intercept[AnalysisException] { +val e = intercept[AnalysisException] { df2.filter($"a".isin($"b")) } +Seq("cannot resolve", "due to data type mismatch: Arguments must be same type but were") + .foreach { s => + assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) + } + } + + test("isInCollection: Scala Collection") { +val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") +checkAnswer(df.filter($"a".isInCollection(Seq(1, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq(3, 2))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq(3, 1))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +// Auto casting should work with mixture of different types in collections +checkAnswer(df.filter($"a".isInCollection(Seq(1.toShort, "2"))), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq("3", 2.toLong))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq(3, "1"))), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +checkAnswer(df.filter($"b".isInCollection(Seq("y", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "y" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isInCollection(Seq("z", "x"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "x")) +checkAnswer(df.filter($"b".isInCollection(Seq("z", "y"))), + df.collect().toSeq.filter(r => r.getString(1) == "z" || r.getString(1) == "y")) + +// Test with different types of collections +checkAnswer(df.filter($"a".isInCollection(Seq(1, 2).toSet)), + df.collect().toSeq.filter(r => r.getInt(0) == 1 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq(3, 2).toArray)), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 2)) +checkAnswer(df.filter($"a".isInCollection(Seq(3, 1).toList)), + df.collect().toSeq.filter(r => r.getInt(0) == 3 || r.getInt(0) == 1)) + +val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") + +val e = intercept[AnalysisException] { + df2.filter($"a".isInCollection(Seq($"b"))) +} +Seq("cannot resolve", "due to data type mismatch: Arguments must be same type but were") + .foreach { s => + assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) + } + } + + test("isInCollection: Java Collection") { +val df = Seq((1, "x"), (2, "y"), (3, "z")).toDF("a", "b") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21416: [SPARK-24371] [SQL] Added isInCollection in DataF...
Github user dbtsai commented on a diff in the pull request: https://github.com/apache/spark/pull/21416#discussion_r191317980 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala --- @@ -392,9 +396,97 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { val df2 = Seq((1, Seq(1)), (2, Seq(2)), (3, Seq(3))).toDF("a", "b") -intercept[AnalysisException] { +val e = intercept[AnalysisException] { df2.filter($"a".isin($"b")) } +Seq("cannot resolve", "due to data type mismatch: Arguments must be same type but were") + .foreach { s => + assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) + } + } + + test("isInCollection: Scala Collection") { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isInCollection in DataFrame AP...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21416 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21442: [SPARK-24402] [SQL] Optimize `In` expression when only o...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21442 +@cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21442: [SPARK-24402] [SQL] Optimize `In` expression when...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21442 [SPARK-24402] [SQL] Optimize `In` expression when only one element in the collection or collection is empty ## What changes were proposed in this pull request? Two new rules in the logical plan optimizers are added. 1. When there is only one element in the **`Collection`**, the physical plan will be optimized to **`EqualTo`**, so predicate pushdown can be used. ```scala profileDF.filter( $"profileID".isInCollection(Set(6))).explain(true) """ |== Physical Plan == |*(1) Project [profileID#0] |+- *(1) Filter (isnotnull(profileID#0) && (profileID#0 = 6)) | +- *(1) FileScan parquet [profileID#0] Batched: true, Format: Parquet, | PartitionFilters: [], | PushedFilters: [IsNotNull(profileID), EqualTo(profileID,6)], | ReadSchema: struct """.stripMargin ``` 2. When the **`Collection`** is empty, and the input is nullable, the logical plan will be simplified to ```scala profileDF.filter( $"profileID".isInCollection(Set())).explain(true) """ |== Optimized Logical Plan == |Filter if (isnull(profileID#0)) null else false |+- Relation[profileID#0] parquet """.stripMargin ``` TODO: 1. For multiple conditions with numbers less than certain thresholds, we should still allow predicate pushdown. 2. Optimize the **`In`** using **`tableswitch`** or **`lookupswitch`** when the numbers of the categories are low, and they are **`Int`**, **`Long`**. 3. The default immutable hash trees set is slow for query, and we should do benchmark for using different set implementation for faster query. 4. **`filter(if (condition) null else false)`** can be optimized to false. ## How was this patch tested? Couple new tests are added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark optimize-in Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21442.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21442 commit 7c44c70fe664e73b36a49a974ece93a0c83d7f8e Author: DB Tsai <d_tsai@...> Date: 2018-05-28T07:27:09Z Optimize `In` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isInCollection in DataFrame AP...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21416 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21416: [SPARK-24371] [SQL] Added isInCollection in DataFrame AP...
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21416 @cloud-fan unfortunately, scala vararg can not be overloaded, and scala will return the following error. ```scala Error:(410, 32) ambiguous reference to overloaded definition, both method isin in class Column of type (values: Iterable[_])org.apache.spark.sql.Column and method isin in class Column of type (list: Any*)org.apache.spark.sql.Column match argument types (Seq[Int]) checkAnswer(df.filter($"a".isin(Seq(1, 2))), ``` I'm leaning toward to using `isInCollection` now, and implement the corresponding python APIs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org