[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r230094499 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") + .endRecord() +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record == recordFromRow) + } + + test("encoder resolves union types to rows") { +val schema = SchemaBuilder.record("record").fields() + .name("int_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion) + .withDefault(null) + .name("string_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion) + .withDefault(null) + .name("int_long_union").`type`( + SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion) + .withDefault(0) + .name("float_double_union").`type`( + SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion) + .withDefault(0.0) + .endRecord +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record.get(0) == recordFromRow.get(0)) +assert(record.get(1) == recordFr
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229328064 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") --- End diff -- In our first pass at the encoder, we hadn't supported Avro map types yet, and so this test case omits an Avro map-typed field. Can we add a simple one (with a default value) to this test case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229329920 --- Diff: external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala --- @@ -1374,4 +1377,182 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { |} """.stripMargin) } + + test("generic record converts to row and back") { +val nested = + SchemaBuilder.record("simple_record").fields() +.name("nested1").`type`("int").withDefault(0) +.name("nested2").`type`("string").withDefault("string").endRecord() +val schema = SchemaBuilder.record("record").fields() + .name("boolean").`type`("boolean").withDefault(false) + .name("int").`type`("int").withDefault(0) + .name("long").`type`("long").withDefault(0L) + .name("float").`type`("float").withDefault(0.0F) + .name("double").`type`("double").withDefault(0.0) + .name("string").`type`("string").withDefault("string") + .name("bytes").`type`("bytes").withDefault(java.nio.ByteBuffer.wrap("bytes".getBytes)) + .name("nested").`type`(nested).withDefault(new GenericRecordBuilder(nested).build) + .name("enum").`type`( + SchemaBuilder.enumeration("simple_enums") +.symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS")) + .withDefault("SPADES") + .name("int_array").`type`( + SchemaBuilder.array().items().`type`("int")) + .withDefault(java.util.Arrays.asList(1, 2, 3)) + .name("string_array").`type`( + SchemaBuilder.array().items().`type`("string")) + .withDefault(java.util.Arrays.asList("a", "b", "c")) + .name("record_array").`type`( + SchemaBuilder.array.items.`type`(nested)) + .withDefault(java.util.Arrays.asList( +new GenericRecordBuilder(nested).build, +new GenericRecordBuilder(nested).build)) + .name("enum_array").`type`( + SchemaBuilder.array.items.`type`( +SchemaBuilder.enumeration("simple_enums") + .symbols("SPADES", "HEARTS", "CLUBS", "DIAMONDS"))) + .withDefault(java.util.Arrays.asList("SPADES", "HEARTS", "SPADES")) + .name("fixed_array").`type`( + SchemaBuilder.array.items().`type`( +SchemaBuilder.fixed("simple_fixed").size(3))) + .withDefault(java.util.Arrays.asList("foo", "bar", "baz")) + .name("fixed").`type`(SchemaBuilder.fixed("simple_fixed").size(16)) + .withDefault("string_length_16") + .endRecord() +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record == recordFromRow) + } + + test("encoder resolves union types to rows") { +val schema = SchemaBuilder.record("record").fields() + .name("int_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("int").endUnion) + .withDefault(null) + .name("string_null_union").`type`( + SchemaBuilder.unionOf.`type`("null").and.`type`("string").endUnion) + .withDefault(null) + .name("int_long_union").`type`( + SchemaBuilder.unionOf.`type`("int").and.`type`("long").endUnion) + .withDefault(0) + .name("float_double_union").`type`( + SchemaBuilder.unionOf.`type`("float").and.`type`("double").endUnion) + .withDefault(0.0) + .endRecord +val encoder = AvroEncoder.of[GenericData.Record](schema) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericData.Record]] +val record = new GenericRecordBuilder(schema).build +val row = expressionEncoder.toRow(record) +val recordFromRow = expressionEncoder.resolveAndBind().fromRow(row) +assert(record.get(0) == recordFromRow.get(0)) +assert(record.get(1) == recordFr
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229326520 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { +AvroExpressionEncoder.of(avroSchema) + } +} + +class SerializableSchema(@transient var value: Schema) extends Externalizable { + def this() = this(null) + override def readExternal(in: ObjectInput): Unit = { +value = new Parser().parse(in.readObject().asInstanceOf[String]) + } + override def writeExternal(out: ObjectOutput): Unit = out.writeObject(value.toString) + def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, datum) +} + +object AvroExpressionEncoder { + + def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = { +val schema = avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema] +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T]( + serializer, + deserializer, + ClassTag[T](avroClass)) + } + + def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = { +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val avroClass = Option(ReflectData.get.getClass(schema)) + .map(_.asSubclass(classOf[SpecificRecord])) + .getOrElse(classOf[GenericData.Record]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserial
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229332500 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1617,6 +1617,58 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp } } +/** + * Initializes an Avro Record instance (that implements the IndexedRecord interface) by calling + * the `put` method on a the Record instance with the provided position and value arguments + * + * @param objectInstance an expression that will evaluate to the Record instance + * @param args a sequence of expression pairs that will respectively evaluate to the index of + * the record in which to insert, and the argument value to insert + */ +case class InitializeAvroObject( --- End diff -- It's possible to refactor the `NewInstance` expression also in this objects class to support construction of Avro classes, which would eliminate the need for a separate `InititalizeAvroObject`. Interestingly, the same refactor would also generalize in such a way as to allow us to remove the need for a separate `InitializeJavaBean` expression. To summarize the change: `NewInstance` would accept a `Seq` of `Expression` for the arguments to the instance's constructor, but _also_ a `Seq` of `(String, Seq[Expression])` tuples, being an ordered list of setter methods and the methods' respective arguments to call _after_ the object has been constructed. This covers both creation of Java beans, it covers the creation and instantiation of `SpecificRecord`. See the necessary changes to `NewInstance`, [here](https://github.com/apache/spark/pull/21348/files#diff-e436c96ea839dfe446837ab2a3531f93R447). Also an additional clause to `TreeNode`, [here](https://github.com/apache/spark/pull/21348/files#diff-eac5b02bb450a235fef5e902a2671254R361). And then the changes to `JavaTypeInference`, [here](https://github.com/apache/spark/pull/21348/files#diff-031a812c8799b92eeecab0cbc9ac8f25). If this refactor is considered a bit too complicated for this PR, we can start with an `InitializeAvroObject` and do some cleanup in a followup. As background, this refactor was initially suggested by @cloud-fan, see [comment](https://github.com/apache/spark/pull/20085#issuecomment-364043282). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22878: [SPARK-25789][SQL] Support for Dataset of Avro
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/22878#discussion_r229323278 --- Diff: external/avro/src/main/scala/org/apache/spark/sql/avro/AvroEncoder.scala --- @@ -0,0 +1,534 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.avro + +import java.io._ +import java.util.{Map => JMap} + +import scala.collection.JavaConverters._ +import scala.language.existentials +import scala.reflect.ClassTag + +import org.apache.avro.Schema +import org.apache.avro.Schema.Parser +import org.apache.avro.Schema.Type._ +import org.apache.avro.generic.{GenericData, IndexedRecord} +import org.apache.avro.reflect.ReflectData +import org.apache.avro.specific.SpecificRecord + +import org.apache.spark.sql.Encoder +import org.apache.spark.sql.avro.SchemaConverters._ +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.codegen._ +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable => _, _} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData} +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.UTF8String + +/** + * A Spark-SQL Encoder for Avro objects + */ +object AvroEncoder { + /** + * Provides an Encoder for Avro objects of the given class + * + * @param avroClass the class of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class, must implement SpecificRecord + * @return an Encoder for the given Avro class + */ + def of[T <: SpecificRecord](avroClass: Class[T]): Encoder[T] = { +AvroExpressionEncoder.of(avroClass) + } + /** + * Provides an Encoder for Avro objects implementing the given schema + * + * @param avroSchema the Schema of the Avro object for which to generate the Encoder + * @tparam T the type of the Avro class that implements the Schema, must implement IndexedRecord + * @return an Encoder for the given Avro Schema + */ + def of[T <: IndexedRecord](avroSchema: Schema): Encoder[T] = { +AvroExpressionEncoder.of(avroSchema) + } +} + +class SerializableSchema(@transient var value: Schema) extends Externalizable { + def this() = this(null) + override def readExternal(in: ObjectInput): Unit = { +value = new Parser().parse(in.readObject().asInstanceOf[String]) + } + override def writeExternal(out: ObjectOutput): Unit = out.writeObject(value.toString) + def resolveUnion(datum: Any): Int = GenericData.get.resolveUnion(value, datum) +} + +object AvroExpressionEncoder { + + def of[T <: SpecificRecord](avroClass: Class[T]): ExpressionEncoder[T] = { +val schema = avroClass.getMethod("getClassSchema").invoke(null).asInstanceOf[Schema] +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserializerFor(schema) +new ExpressionEncoder[T]( + serializer, + deserializer, + ClassTag[T](avroClass)) + } + + def of[T <: IndexedRecord](schema: Schema): ExpressionEncoder[T] = { +assert(toSqlType(schema).dataType.isInstanceOf[StructType]) +val avroClass = Option(ReflectData.get.getClass(schema)) + .map(_.asSubclass(classOf[SpecificRecord])) + .getOrElse(classOf[GenericData.Record]) +val serializer = AvroTypeInference.serializerFor(avroClass, schema) +val deserializer = AvroTypeInference.deserial
[GitHub] spark pull request #21348: [SPARK-22739][Catalyst] Additional Expression Sup...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/21348#discussion_r228577979 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1799,3 +1805,65 @@ case class ValidateExternalType(child: Expression, expected: DataType) ev.copy(code = code, isNull = input.isNull) } } + +/** + * Determines if the given value is an instanceof a given class. + * + * @param value the value to check + * @param checkedType the class to check the value against + */ +case class InstanceOf( +value: Expression, +checkedType: Class[_]) extends Expression with NonSQLExpression { + + override def nullable: Boolean = false + override def children: Seq[Expression] = value :: Nil + override def dataType: DataType = BooleanType + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported.") + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + +val obj = value.genCode(ctx) + +val code = + s""" +${obj.code} +final boolean ${ev.value} = ${obj.value} instanceof ${checkedType.getName}; + """ + +ev.copy(code = code, isNull = FalseLiteral) + } +} + +/** + * Casts the result of an expression to another type. + * + * @param value The value to cast + * @param resultType The type to which the value should be cast + */ +case class ObjectCast(value: Expression, resultType: DataType) + extends Expression with NonSQLExpression { + + override def nullable: Boolean = value.nullable + override def dataType: DataType = resultType + override def children: Seq[Expression] = value :: Nil + + override def eval(input: InternalRow): Any = +throw new UnsupportedOperationException("Only code-generated evaluation is supported.") --- End diff -- That's not a pattern I'd seen for `eval` at the time of writing this PR. Is there another expression that has an example? I could refactor and find out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21348: [SPARK-22739][Catalyst] Additional Expression Support fo...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/21348 Hi @xuanyuanking, if you'd like to take on the work to fold my prior work in databricks/spark-avro#217 into Spark, that sounds good to me. Please do include me in pull-requests on this topic. Our work with an `Encoder` for Avro fulfills particularly heavy use-cases. I'm happy to review the work and I'd also be able to test the feature branch against our workflows as it's being discussed. The parent ticket was previously marked as `Resolved`. Should we go ahead and re-open it and follow through with this PR, since it's required work for [SPARK-25789](https://issues.apache.org/jira/browse/SPARK-25789)? cc: @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
Github user bdrillard closed the pull request at: https://github.com/apache/spark/pull/20085 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20085: [SPARK-22739][Catalyst][WIP] Additional Expression Suppo...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/20085 Closing this PR in favor of #21348. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21348: [SPARK-22739][Catalyst Additional Expression Supp...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/21348#discussion_r188810286 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -408,16 +439,19 @@ object NewInstance { cls: Class[_], arguments: Seq[Expression], dataType: DataType, + initializations: Seq[(String, Seq[Expression])] = Nil, --- End diff -- @cloud-fan, see here an implementation of the modifications you described [previously](https://github.com/apache/spark/pull/20085#issuecomment-364043282). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21348: [SPARK-22739][Catalyst Additional Expression Supp...
GitHub user bdrillard opened a pull request: https://github.com/apache/spark/pull/21348 [SPARK-22739][Catalyst Additional Expression Support for Objects ## What changes were proposed in this pull request? This PR is a working followup to the expression work begun in #20085. It provides necessary `Expression` definitions to support custom encoders (see this discussion in the [Spark-Avro](https://github.com/databricks/spark-avro/pull/217#issuecomment-342856719) project). It adds the following expressions: * `ObjectCast` - performs explicit casting of an `Expression` result to a `DataType` * `StaticField` - retrieves a static field against a class that otherwise has no accessor method * `InstanceOf` - an `Expression` for the Java `instanceof` operation Modifies `NewInstance` to take a sequence of method-name and arguments initialization tuples, which are executed against the newly constructed object instance. Removes `InitializeJavaBean`, as the generalized `NewInstance` subsumes its use-case. ## How was this patch tested? Adds unit test for `NewInstance` supporting post-constructor initializations. All previous "JavaBean" tests were refactored to use `NewInstance`. Additional examples of working encoders that would use these new expressions can be seen in the [Spark-Avro](https://github.com/bdrillard/spark-avro/blob/avro_encoder_2-4/src/main/scala/com/databricks/spark/avro/AvroEncoder.scala) and [Bunsen](https://github.com/bdrillard/bunsen/blob/issue-23/bunsen-core/src/main/scala/com/cerner/bunsen/EncoderBuilder.scala) projects. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdrillard/spark SPARK-22739 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21348.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21348 commit 1e25a4ededa38796f739fe949b7795f753b5f2aa Author: ALeksander Eskilson <alek.eskilson@...> Date: 2018-05-09T19:06:01Z [SPARK-22739] adding new expressions commit f8643e3ea8dcae58e8af739801c4819f6ca40490 Author: ALeksander Eskilson <alek.eskilson@...> Date: 2018-05-17T00:18:57Z [SPARK-22739] adding new expressions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard closed the pull request at: https://github.com/apache/spark/pull/16648 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/16648 This PR was addressed in #18075. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/20010 Is there a consensus on the preferred behavior here? This issue would also be a blocker to encoders for Spark-Avro in Spark 2.3.0 that @marmbrus mentions in #20085. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20085#discussion_r161607649 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1237,47 +1342,91 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B } /** - * Initialize a Java Bean instance by setting its field values via setters. + * Initialize an object by invoking the given sequence of method names and method arguments. + * + * @param objectInstance An expression evaluating to a new instance of the object to initialize + * @param setters A sequence of method names and their sequence of argument expressions to apply in + *series to the object instance */ -case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Expression]) +case class InitializeObject( + objectInstance: Expression, + setters: Seq[(String, Seq[Expression])]) --- End diff -- We can make use of `NewInstance` which just creates an object of a class, but it's not clear how we can make use of a sequence of `Invoke`, since all these setter methods would have `void` return types, we can't chain them in a fluent manner. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r159676537 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala --- @@ -462,27 +507,139 @@ class TypeCoercionSuite extends AnalysisTest { ArrayType(DoubleType, containsNull = false), Some(ArrayType(DoubleType, containsNull = true))) widenTestWithStringPromotion( - ArrayType(TimestampType, containsNull = false), - ArrayType(StringType, containsNull = true), + ArrayType(ArrayType(IntegerType), containsNull = true), + ArrayType(ArrayType(LongType), containsNull = false), + Some(ArrayType(ArrayType(LongType), containsNull = true))) --- End diff -- @mgaido91, thoughts on this? It's definitely possible for us to revert back to behavior where we don't do IntegerType-to-LongType, xType-to-StringType, etc. promotion inside complex types, which was how a previous form of this PR handled it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r159675261 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -99,11 +100,22 @@ object TypeCoercion { case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) => Some(TimestampType) +case (t1 @ ArrayType(pointType1, nullable1), t2 @ ArrayType(pointType2, nullable2)) --- End diff -- Sure, see #09e49fb8162b97dec76a6324fa4fac4553b22013 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20085: [SPARK-22739][Catalyst][WIP] Additional Expression Suppo...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/20085 I've added some comments describing an issue I've had with generalizing `InitializeJavaBean`, which I thought I'd added to this PR earlier but seem to have not been submitted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20085#discussion_r159520020 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala --- @@ -436,4 +437,16 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { ctx.addImmutableStateIfNotExists("String", mutableState2) assert(ctx.inlinedMutableStates.length == 2) } + + test("InitializeObject") { +val bean = new GenericBean(1, "a") + +val encoder = Encoders.bean(classOf[GenericBean]) +val expressionEncoder = encoder.asInstanceOf[ExpressionEncoder[GenericBean]] +val row = expressionEncoder.toRow(bean) +val beanFromRow = expressionEncoder.resolveAndBind().fromRow(row) + +assert(beanFromRow.getField1 == bean.getField1) +assert(beanFromRow.getField2 == bean.getField2) + } --- End diff -- This test case above demonstrates the issue I encountered with using a sequence of initialization arguments on an object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20085#discussion_r159519915 --- Diff: sql/catalyst/src/test/java/org/apache/spark/sql/catalyst/expressions/GenericBean.java --- @@ -0,0 +1,56 @@ +package org.apache.spark.sql.catalyst.expressions; + +/** + * + */ +public class GenericBean { + private int field1; + private String field2; + + public GenericBean() {} + + public GenericBean(int field1, String field2) { +this.field1 = field1; +this.field2 = field2; + } + + public int getField1() { +return field1; + } + + public void setField1(int field1) { +this.field1 = field1; + } + + public String getField2() { +return field2; + } + + public void setField2(String field2) { +this.field2 = field2; + } + + @Override + public boolean equals(Object o) { +if (this == o) { + return true; +} +if (o == null || getClass() != o.getClass()) { + return false; +} + +GenericBean that = (GenericBean) o; + +if (field1 != that.field1) { + return false; +} +return field2 != null ? field2.equals(that.field2) : that.field2 == null; + } + + @Override + public int hashCode() { +int result = field1; +result = 31 * result + (field2 != null ? field2.hashCode() : 0); +return result; + } +} --- End diff -- This object here exists just as an easy unit test for the `InitializeObject` problem I describe above, it doesn't necessarily need to stay as a test resource. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20085#discussion_r159519672 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -1237,47 +1342,91 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B } --- End diff -- In order to support initializations on more complicated objects, it makes sense to generalize `InitializeJavaBean` to an `InitializeObject` that can take a sequence of method names associated with a sequence of those methods' arguments. It seems thought that on plan analysis, Spark fails to resolve the column names against the Expression `children` when those child expressions are gathered from a `Seq[Expression]`, yielding errors like: ``` Resolved attribute(s) 'field1,'field2 missing from field1#2,field2#3 in operator 'DeserializeToObject initializeobject(newInstance(class org.apache.spark.sql.catalyst.expressions.GenericBean), (setField1,List(assertnotnull('field1))), (setField2,List('field2.toString))), obj#4: org.apache.spark.sql.catalyst.expressions.GenericBean. Attribute(s) with the same name appear in the operation: field1,field2. Please check if the right attribute(s) are used.; org.apache.spark.sql.AnalysisException: Resolved attribute(s) 'field1,'field2 missing from field1#2,field2#3 in operator 'DeserializeToObject initializeobject(newInstance(class org.apache.spark.sql.catalyst.expressions.GenericBean), (setField1,List(assertnotnull('field1))), (setField2,List('field2.toString))), obj#4: org.apache.spark.sql.catalyst.expressions.GenericBean. Attribute(s) with the same name appear in the operation: field1,field2. Please check if the right attribute(s) are used.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:41) ``` Interestingly, if we change the `setters` signature from `Seq[(String, Seq[Expression])]` to `Seq[(String, (Expression, Expression)]`, (the use case for Spark-Avro, where objects are initialized by calling `put` with an integer index argument and then some object argument), the plan will resolve. But of course, such a function signature would in a sense be hard-coded for Avro. Any ideas why passing a sequence of child expressions would yield the analysis error above? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r159244795 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -148,6 +160,61 @@ object TypeCoercion { case (l, r) => None } + /** + * Case 2 type widening over complex types. `widerTypeFunc` is a function that finds the wider + * type over point types. The `widerTypeFunc` specifies behavior over whether types should be + * promoted to StringType. + */ + private def findWiderTypeForTwoComplex( + t1: DataType, + t2: DataType, + widerTypeFunc: (DataType, DataType) => Option[DataType]): Option[DataType] = { +(t1, t2) match { + case (_, _) if t1 == t2 => Some(t1) + case (NullType, _) => Some(t1) + case (_, NullType) => Some(t1) + + case (ArrayType(pointType1, nullable1), ArrayType(pointType2, nullable2)) => +val dataType = widerTypeFunc.apply(pointType1, pointType2) + +dataType.map(ArrayType(_, nullable1 || nullable2)) + + case (MapType(keyType1, valueType1, nullable1), MapType(keyType2, valueType2, nullable2)) => +val keyType = widerTypeFunc.apply(keyType1, keyType2) +val valueType = widerTypeFunc.apply(valueType1, valueType2) + +if (keyType.nonEmpty && valueType.nonEmpty) { + Some(MapType(keyType.get, valueType.get, nullable1 || nullable2)) +} else { + None +} + + case (StructType(fields1), StructType(fields2)) => +val fieldTypes = fields1.zip(fields2).map { case (f1, f2) => --- End diff -- @mgaido91 That's seems to be the assumption already made in [`findTightestCommonType`](https://github.com/bdrillard/spark/blob/d42dfa55105c8944b63781fc61b59c98a99338d1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala#L115). The [`sameType`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala#L83) function on DataType also requires structfields are ordered the same. The difference here is that we don't require the structfields strictly have the same type, so we can support widening to LongType, StringType, etc. But we do require the fields 1. have the same order, and 2. have the same name (either with strict case, or ignoring case). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/20010 @mgaido91 Agreed with that concern. I think the last round of tests I've just added covers the permutation of cases well, where we have arrays and maps of structs, and structs of arrays and maps. cc: @viirya, @gatorsmile, Could perhaps one of you trigger a Jenkins build against this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158852830 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala --- @@ -469,12 +488,21 @@ class TypeCoercionSuite extends AnalysisTest { ArrayType(ArrayType(IntegerType), containsNull = false), ArrayType(ArrayType(LongType), containsNull = false), Some(ArrayType(ArrayType(LongType), containsNull = false))) +widenTestWithStringPromotion( + StructType(StructField("a", ArrayType(LongType)) :: Nil), + StructType(StructField("a", ArrayType(StringType)) :: Nil), + Some(StructType(StructField("a", ArrayType(StringType)) :: Nil))) + // Without string promotion widenTestWithoutStringPromotion(IntegerType, StringType, None) widenTestWithoutStringPromotion(StringType, TimestampType, None) widenTestWithoutStringPromotion(ArrayType(LongType), ArrayType(StringType), None) widenTestWithoutStringPromotion(ArrayType(StringType), ArrayType(TimestampType), None) +widenTestWithoutStringPromotion( + StructType(StructField("a", ArrayType(LongType)) :: Nil), + StructType(StructField("a", ArrayType(StringType)) :: Nil), + None) --- End diff -- I've grouped non-string promotion tests, and added tests for arrays, maps, and then structures nested in those types or having fields of those types, https://github.com/apache/spark/pull/20010/files#diff-01ecdd038c5c2f53f38118912210fef8R563 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158852200 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala --- @@ -469,12 +488,21 @@ class TypeCoercionSuite extends AnalysisTest { ArrayType(ArrayType(IntegerType), containsNull = false), ArrayType(ArrayType(LongType), containsNull = false), Some(ArrayType(ArrayType(LongType), containsNull = false))) +widenTestWithStringPromotion( + StructType(StructField("a", ArrayType(LongType)) :: Nil), + StructType(StructField("a", ArrayType(StringType)) :: Nil), + Some(StructType(StructField("a", ArrayType(StringType)) :: Nil))) --- End diff -- I've grouped string promotion tests for arrays and maps, with tests where those types are nested in structs or are fields in structs, https://github.com/apache/spark/pull/20010/files#diff-01ecdd038c5c2f53f38118912210fef8R504 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158852095 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala --- @@ -389,6 +389,25 @@ class TypeCoercionSuite extends AnalysisTest { widenTest(StringType, MapType(IntegerType, StringType, true), None) widenTest(ArrayType(IntegerType), StructType(Seq()), None) +widenTest( + ArrayType(StringType, containsNull=true), + ArrayType(StringType, containsNull=false), + Some(ArrayType(StringType, containsNull=true))) +widenTest( + MapType(StringType, StringType, valueContainsNull=true), + MapType(StringType, StringType, valueContainsNull=false), + Some(MapType(StringType, StringType, valueContainsNull=true))) --- End diff -- Here are tests for nested complex types, https://github.com/apache/spark/pull/20010/files#diff-01ecdd038c5c2f53f38118912210fef8R405 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158851910 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala --- @@ -389,6 +389,25 @@ class TypeCoercionSuite extends AnalysisTest { widenTest(StringType, MapType(IntegerType, StringType, true), None) widenTest(ArrayType(IntegerType), StructType(Seq()), None) +widenTest( + ArrayType(StringType, containsNull=true), --- End diff -- Fixed, see dfb4a3b47d88853be348aafd9802f799639693f6 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158851887 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -158,11 +213,8 @@ object TypeCoercion { findTightestCommonType(t1, t2) .orElse(findWiderTypeForDecimal(t1, t2)) .orElse(stringPromotion(t1, t2)) - .orElse((t1, t2) match { -case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => - findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) -case _ => None - }) + .orElse(findWiderTypeForTwoComplex(t1, t2, findWiderTypeForTwo)) + --- End diff -- Fixed, see dfb4a3b47d88853be348aafd9802f799639693f6 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158851832 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala --- @@ -469,12 +488,21 @@ class TypeCoercionSuite extends AnalysisTest { ArrayType(ArrayType(IntegerType), containsNull = false), ArrayType(ArrayType(LongType), containsNull = false), Some(ArrayType(ArrayType(LongType), containsNull = false))) +widenTestWithStringPromotion( + StructType(StructField("a", ArrayType(LongType)) :: Nil), + StructType(StructField("a", ArrayType(StringType)) :: Nil), + Some(StructType(StructField("a", ArrayType(StringType)) :: Nil))) + --- End diff -- Fixed, see dfb4a3b47d88853be348aafd9802f799639693f6 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20085: [SPARK-22739][Catalyst][WIP] Additional Expression Suppo...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/20085 @viirya I've found the same intent of a `ValueIfType` function can be attained by adding a simpler `InstanceOf` [expressions](https://github.com/apache/spark/pull/20085/files#diff-e436c96ea839dfe446837ab2a3531f93R265) that can be used as the predicate to the existing `If` expression, and then using `ObjectCast` on the results. That approach handles your [first question](https://github.com/apache/spark/pull/20085#discussion_r158760292). To your [second question](https://github.com/apache/spark/pull/20085#discussion_r158760302), it makes sense the input value expression should always have a DataType of ObjectType. Is there a way you'd prefer to make that check? Or throw some kind of exception of `value.dataType != ObjectType`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over Struct...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/20010 @gczsjdy, @mgaido91 If you all are comfortable with it, I think this PR is in a state where we could trigger a build. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20085#discussion_r158761511 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala --- @@ -390,8 +391,8 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { test("SPARK-22696: InitializeJavaBean should not use global variables") { --- End diff -- Fixed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20085#discussion_r158761155 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala --- @@ -106,27 +106,27 @@ trait InvokeLike extends Expression with NonSQLExpression { } /** - * Invokes a static function, returning the result. By default, any of the arguments being null - * will result in returning null instead of calling the function. - * - * @param staticObject The target of the static call. This can either be the object itself - * (methods defined on scala objects), or the class object - * (static methods defined in java). - * @param dataType The expected return type of the function call - * @param functionName The name of the method to call. - * @param arguments An optional list of expressions to pass as arguments to the function. - * @param propagateNull When true, and any of the arguments is null, null will be returned instead - * of calling the function. - * @param returnNullable When false, indicating the invoked method will always return - * non-null value. - */ + * Invokes a static function, returning the result. By default, any of the arguments being null --- End diff -- Those additional spaces shouldn't be there, I've fixed them. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20085: [SPARK-22739][Catalyst][WIP] Additional Expression Suppo...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/20085 cc: @marmbrus --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20085: [SPARK-22739][Catalyst][WIP] Additional Expressio...
GitHub user bdrillard opened a pull request: https://github.com/apache/spark/pull/20085 [SPARK-22739][Catalyst][WIP] Additional Expression Support for Objects ## What changes were proposed in this pull request? This PR is a work-in-progress adding additional `Expression` support for object types. It intends to provide necessary expressions to support custom encoders (see discussion in [Spark-Avro](https://github.com/databricks/spark-avro/pull/217#issuecomment-342856719)). This is an initial review, looking for feedback concerning a few questions and guidance concerning best unit-testing practices for new `Expression` classes in Catalyst. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdrillard/spark spark_expressions Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20085.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20085 commit b1973842c7e3fd3e5b8fb0190368c86446b29003 Author: ALeksander Eskilson <alek.eskilson@...> Date: 2017-12-11T19:14:36Z adding new expressions commit 74fdb9b1079f2cf60616855278ffc27c0a380b8e Author: ALeksander Eskilson <alek.eskilson@...> Date: 2017-12-26T16:52:04Z adding test case for initialize object generalization commit 135712f9072b56cbe857c6da64a342481bf00318 Author: ALeksander Eskilson <alek.eskilson@...> Date: 2017-12-26T17:28:05Z fixup --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158724861 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -158,11 +213,8 @@ object TypeCoercion { findTightestCommonType(t1, t2) .orElse(findWiderTypeForDecimal(t1, t2)) .orElse(stringPromotion(t1, t2)) - .orElse((t1, t2) match { -case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => - findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) -case _ => None - }) + .orElse(findWiderTypeForTwoComplex(t1, t2, findWiderTypeForTwo)) --- End diff -- It should not. `findWiderTypeForTwoComplex` will only be called as we operate over "complex" types (i.e arrays, maps, structs), and will only recurse (calling `findWiderTypeForTwo`) over the child point-types of a complex type, so we ensure the recursive computation gets narrower as it recurses, until eventually terminating at the leaf level of the schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158728293 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -158,11 +169,6 @@ object TypeCoercion { findTightestCommonType(t1, t2) .orElse(findWiderTypeForDecimal(t1, t2)) .orElse(stringPromotion(t1, t2)) - .orElse((t1, t2) match { -case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => - findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) -case _ => None - }) --- End diff -- Yes, this PR should be ready for a Jenkins build. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r158081192 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -158,11 +169,6 @@ object TypeCoercion { findTightestCommonType(t1, t2) .orElse(findWiderTypeForDecimal(t1, t2)) .orElse(stringPromotion(t1, t2)) - .orElse((t1, t2) match { -case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => - findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) -case _ => None - }) --- End diff -- @gczsjdy I've taken a shot at implementing your suggestion with `findWiderTypeForTwoComplex`, which takes as an argument a `widerTypeFunc`, describing which widening behavior to apply to point types (should they permit promotion to string or not). Because `ArrayType` instances that would require widening the type could be nested in `StructType` and `MapType`, I think it's necessary to have more case matching than would be in `findWiderTypeForArray`, hence `findWiderTypeForTwoComplex`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/19518 This PR was addressed by #19811, closing this one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard closed the pull request at: https://github.com/apache/spark/pull/19518 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r157901873 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -158,11 +169,6 @@ object TypeCoercion { findTightestCommonType(t1, t2) .orElse(findWiderTypeForDecimal(t1, t2)) .orElse(stringPromotion(t1, t2)) - .orElse((t1, t2) match { -case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => - findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) -case _ => None - }) --- End diff -- Sure, I think that's possible. In order to handle instances with and without string promotion, I think it may be necessary to add a boolean parameter, and then to handle the instances where the pointType/keyType and valueType may result in `None`, see https://github.com/apache/spark/pull/20010/files#diff-383a8cdd0a9c58cae68e0a79295520a3R105 To support the minor change in function signature for `findTightestCommonType`, I have to do some refactoring in the tests. Let me know if you think there's a cleaner implementation, but this seems to help localize like concerns into `findTightestCommonType`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r157792439 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -182,12 +188,6 @@ object TypeCoercion { t2: DataType): Option[DataType] = { findTightestCommonType(t1, t2) .orElse(findWiderTypeForDecimal(t1, t2)) - .orElse((t1, t2) match { -case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => - findWiderTypeWithoutStringPromotionForTwo(et1, et2) -.map(ArrayType(_, containsNull1 || containsNull2)) -case _ => None - }) --- End diff -- Same comment here, as above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r157792361 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -158,11 +169,6 @@ object TypeCoercion { findTightestCommonType(t1, t2) .orElse(findWiderTypeForDecimal(t1, t2)) .orElse(stringPromotion(t1, t2)) - .orElse((t1, t2) match { -case (ArrayType(et1, containsNull1), ArrayType(et2, containsNull2)) => - findWiderTypeForTwo(et1, et2).map(ArrayType(_, containsNull1 || containsNull2)) -case _ => None - }) --- End diff -- I think if we now check for ArrayTypes (including MapTypes) in `findTightestCommonType`, the match here becomes redundant. @mgaido91, @gczsjdy, does this thinking make sense to you both? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r157792706 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala --- @@ -389,6 +389,25 @@ class TypeCoercionSuite extends AnalysisTest { widenTest(StringType, MapType(IntegerType, StringType, true), None) widenTest(ArrayType(IntegerType), StructType(Seq()), None) +widenTest( + ArrayType(StringType, containsNull=true), + ArrayType(StringType, containsNull=false), + Some(ArrayType(StringType, containsNull=true))) +widenTest( + MapType(StringType, StringType, valueContainsNull=true), + MapType(StringType, StringType, valueContainsNull=false), + Some(MapType(StringType, StringType, valueContainsNull=true))) + +widenTest( + StructType(StructField("a", ArrayType(StringType, containsNull=true)) :: Nil), + StructType(StructField("a", ArrayType(StringType, containsNull=false)) :: Nil), + Some(StructType(StructField("a", ArrayType(StringType, containsNull=true)) :: Nil))) +widenTest( + StructType(StructField("a", MapType(StringType, StringType, valueContainsNull=true)) :: Nil), + StructType(StructField("a", MapType(StringType, StringType, valueContainsNull=false)) :: Nil), + Some(StructType( +StructField("a", MapType(StringType, StringType, valueContainsNull=true)) :: Nil))) --- End diff -- Here's a test for nested structures where an explicit match case against ArrayType/MapType is necessary due to the difference in nullability between the two structures. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r157787536 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -99,6 +99,17 @@ object TypeCoercion { case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) => Some(TimestampType) +case (t1 @ ArrayType(pointType1, nullable1), t2 @ ArrayType(pointType2, nullable2)) +if t1.sameType(t2) => + val dataType = findTightestCommonType(pointType1, pointType2).get --- End diff -- @gczsjdy `sameType` will ignore the nullability of the two types, only checking if the `DataType` is the same. We would expect then that the first case of `findTightestCommonType` would pass the types on through, but during its equality check, it also checks the type nullability, which for ArrayType and MapType, may differ between t1 and t2: ``` case (t1, t2) if t1 == t1 => Some(t1) ``` That means we can establish that two StructFields of ArrayType/MapType are the same DataType, but if they have different nullability, then the above case match won't match them, nor will any other case in the match set. So in order to find the tightest common type where nullabilities of the point-types may differ, we'll need to recurse. See a case like: ``` widenTest( StructType(StructField("a", ArrayType(StringType, containsNull=true)) :: Nil), StructType(StructField("a", ArrayType(StringType, containsNull=false)) :: Nil), Some(StructType(StructField("a", ArrayType(StringType, containsNull=true)) :: Nil))) ``` At the moment, since we have no case to match ArrayType/MapType where the nullabilities may differ, this case would fail. I can add this test explicitly to the suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/20010#discussion_r157778903 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala --- @@ -99,6 +99,17 @@ object TypeCoercion { case (_: TimestampType, _: DateType) | (_: DateType, _: TimestampType) => Some(TimestampType) +case (t1 @ ArrayType(pointType1, nullable1), t2 @ ArrayType(pointType2, nullable2)) +if t1.sameType(t2) => + val dataType = findTightestCommonType(pointType1, pointType2).get --- End diff -- @mgaido91, @gczsjdy, sure, I'll add a pair of test cases for nested complex structures for `ArrayType` and `MapType` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20010: [SPARK-22826][SQL] findWiderTypeForTwo Fails over...
GitHub user bdrillard opened a pull request: https://github.com/apache/spark/pull/20010 [SPARK-22826][SQL] findWiderTypeForTwo Fails over StructField of Array ## What changes were proposed in this pull request? [SPARK-22826](https://issues.apache.org/jira/browse/SPARK-22826) Attempting to find the tightest common type over a struct holding fields of `ArrayType` or `MapType` will fail if the types of those struct fields are not exactly equal. See the JIRA for an example. ## How was this patch tested? This patch adds a test case for `ArrayType` and `MapType` for finding tightest common types. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdrillard/spark SPARK-22826 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20010.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20010 commit 1ac4b0be7926d24b5cbee54e0a8b56381adda462 Author: ALeksander Eskilson <alek.eskil...@cerner.com> Date: 2017-12-18T21:09:18Z [SPARK-22826] adding support for tighest type coercion over array/map types in a structfield --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19811: [SPARK-18016][SQL] Code Generation: Constant Pool Limit ...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/19811 As some context, I had initially found array initializations necessary because the number of `init` methods created to do line-by-line var initializations for large test cases was still triggering constant pool errors, even after having compacted the data into arrays. A loop allowed reduction of the number of expressions needed to initialize that array state, but in order to ensure that single loops could initialize whole groups of variables, it became necessary to add additional state to hold the matching init codes and the length of the array. I think @mgaido91's work in SPARK-6 obviates that original issue with the way it would re-distribute the init method calls. Perhaps another benefit, removing the requirement that state be initialized in loops would allow us to also compact more complicated state than previously could have been initialized in loops, like the [`UnsafeRowWriter`](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L78), which can appear as many times as struct columns appear in the dataset. Since their initialization is dependent on varying arguments, no single loop could initialize all of them, but inline-statements could, allowing us to potentially compact them (or any other prevalent non-simply assigned object type). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19811: [SPARK-18016][SQL] Code Generation: Constant Pool...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/19811#discussion_r153534679 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -177,11 +190,67 @@ class CodegenContext { * the list of default imports available. * Also, generic type arguments are accepted but ignored. * @param variableName Name of the field. - * @param initCode The statement(s) to put into the init() method to initialize this field. + * @param codeFunctions Function includes statement(s) to put into the init() method to + * initialize this field. An argument is the name of the mutable state variable * If left blank, the field will be default-initialized. + * @param inline whether the declaration and initialization code may be inlined rather than + * compacted. If true, the name is not changed + * @return the name of the mutable state variable, which is either the original name if the + * variable is inlined to the outer class, or an array access if the variable is to be + * stored in an array of variables of the same type and initialization. + * primitive type variables will be inlined into outer class when the total number of + * mutable variables is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD` + * the max size of an array for compaction is given by + * `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. */ - def addMutableState(javaType: String, variableName: String, initCode: String = ""): Unit = { -mutableStates += ((javaType, variableName, initCode)) + def addMutableState( + javaType: String, + variableName: String, + codeFunctions: String => String = _ => "", + inline: Boolean = false): String = { +val varName = if (!inline) freshName(variableName) else variableName +val initCode = codeFunctions(varName) + +if (inline || +// want to put a primitive type variable at outerClass for performance +isPrimitiveType(javaType) && + (mutableStates.length < CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD) || +// identify multi-dimensional array or no simply-assigned object +!isPrimitiveType(javaType) && + (javaType.contains("[][]") || + !initCode.matches("(^[\\w_]+\\d+\\s*=\\s*null;|" --- End diff -- @mgaido91, could you describe what you mean by "Isn't it enough that the init code used is always the same?" There are definitely some [complicated init codes](https://github.com/apache/spark/pull/19811/files/006b2fdad62fe64b623235314105c7b6f4849b5d#diff-39298b470865a4cbc67398a4ea11e767R122) used throughout the codebase where, I think as @kiszk was saying, the initcode makes use of a previously defined variable. Really it would be nice if we had a way of _knowing_ whether an initialization was simple (assigned to a default for primitives, or null or the 0-parameter constructor for objects). Maybe we could define an abstract `InitCode` holding a single `code` field and then extend that with `Simple` and `NonSimple` case classes, then we could pattern match on the additional type information rather than trying to regex match the code itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/19518 Thanks for giving this the attention to shepard it on through. I haven't had the time to do the additional coding work necessary to properly benchmark it in the last few weeks. @kiszk, if there are any questions in regards to my earlier implementation as you make/review the second PR, I'm happy to make clarifications and would be able to respond to those in writing quickly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/19518 @kiszk Ah, thanks for the link back to that discussion. I'll make modifications to the trials for better data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/19518 @kiszk You are correct that the current implementation compacts all mutable state (where the state does not have to be _explicitly_ inlined). To your last question, I'd attempted some analysis of the JVM bytecode of array versus inlined state initialized either through method calls or in loops. I'd posted the experiment and results: https://github.com/bdrillard/bytecode-poc If Spark has its own benchmarking tools, I'd be happy to use those to compare Catalyst-generated classes further. To the general question of _when_ we compact state, I think some kind of threshold still does makes sense. It would be best to ensure that the typical code path (for typical Dataset schemas) remains un-impacted by the changes (as was the aim when generating nested classes in #18075). I've found trying to set a global threshold for when to compact mutable state can be hard. Some state _has_ to be inlined (state that uses parameterized constructors that can't be easily initialized with loops, like the `BufferHolder` and `UnsafeRowWriter`). I've found situations where, due to code generator flow, we began by inlining an amount of state that _could have been_ compacted, then started compacting state as after a set threshold, but then began inlining state again that _could not be_ compacted, forcing us over the constant pool limit. It's difficult to tell when a certain piece of state will be referenced frequently or infrequently. For example, we do know some pieces of primitive mutable state, like global booleans that are part of conditional checks, are initialized globally, assigned once in one method, and then referenced only once in a separate caller method. These are excellent candidates for compaction, since they proliferate very quickly and are, in a sense, "only used once" (declared, initialized, re-assigned in a method, accessed in another method, never used again). Other pieces of state, like row objects, and JavaBean objects, will be accessed a number of times relative to how many fields they have, which isn't necessarily easy info to retrieve during code generation (we'd have to reflect or do inspection of the initialization code to know how many fields such an object has). But these items are probably still good candidates for compaction in general because of how many of a given type there could be. I'm inclined to use a threshold against the name/types of the state, rather than a global threshold. Since `freshName` is always monotonically increasing from 1 for a given variable prefix, we could know when a threshold for state of that type was reached, and when we could begin compacting that type of state, independently/concurrently with the other types of state. Such a scheme would allow us to ensure the usual flow of code-generation remains as it is now, with no state-compaction for typical operations, and then with state-compaction in the more extreme cases that would threaten to blow the Constant Pool limit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18075 The second part that follows this merged PR is up as #19518. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/16648 @kiszk please see #19518 for part 2 of this original PR, and thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/19518#discussion_r145213781 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -801,7 +908,10 @@ class CodegenContext { addNewFunction(name, code) } - foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})")) + val exprs = transformFunctions(functions.map(name => +s"$name(${arguments.map(_._2).mkString(", ")})")) + + splitExpressions(exprs, funcName, arguments) --- End diff -- Changes I made here to `splitExpressions` were to handle instances where the split code method references were still over 64kb. It would seem this problem is addressed by @mgaido91 in #19480, and that implementation is much more thorough, so if that PR gets merged, I'd prefer to rebase against that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
GitHub user bdrillard opened a pull request: https://github.com/apache/spark/pull/19518 [SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - State Compaction ## What changes were proposed in this pull request? This PR is the part two followup to #18075, meant to address [SPARK-18016](https://github.com/apache/spark/pull/SPARK-18016), Constant Pool limit exceptions. Part 1 implemented `NestedClass` code splitting, in which excess code was split off into nested private sub-classes of the `OuterClass`. In Part 2 we address excess mutable state, in which the number of inlined variables declared at the top of the `OuterClass` can also exceed the constant pool limit. Here, we modify the `addMutableState` function in the `CodeGenerator` to check if the declared state can be easily initialized compacted into an array and initialized in loops rather than inlined and initialized with its own line of code. We identify four types of state that can compacted: * Primitive state (ints, booleans, etc) * Object state of like-type without any initial assignment * Object state of like-type initialized to `null` * Object state of like-type initialized to the type's base (no-argument) constructor With mutable state compaction, at the top of the class we generate array declarations like: ``` private Object[] references; private UnsafeRow result; private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder holder; private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter rowWriter; ... private boolean[] mutableStateArray1 = new boolean[12507]; private InternalRow[] mutableStateArray4 = new InternalRow[5268]; private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[] mutableStateArray5 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[7663]; private java.lang.String[] mutableStateArray2 = new java.lang.String[12477]; private int[] mutableStateArray = new int[42509]; private java.lang.Object[] mutableStateArray6 = new java.lang.Object[30]; private boolean[] mutableStateArray3 = new boolean[10536]; ``` and these arrays are initialized in loops as: ``` private void init_3485() { for (int i = 0; i < mutableStateArray3.length; i++) { mutableStateArray3[i] = false; } } ``` For compacted mutable state, `addMutableState` returns an array accessor value, which is then referenced in the subsequent generated code. **Note**: some state cannot be easily compacted (except without perhaps deeper changes to generating code), as some state value names are taken for granted at the global level during code generation (see `CatalystToExternalMap` in `Objects` as an example). For this state, we provide an `inline` hint to the function call, which indicates that the state should be inlined to the `OuterClass`. Still, the state we can easily compact manages to reduce the Constant Pool to an tractable size for the wide/deeply nested schemas I was able to test against. ## How was this patch tested? Tested against several complex schema types, also added a test case generating 40,000 string columns and creating the `UnsafeProjection`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdrillard/spark state_compaction Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19518.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19518 commit 081bc5de6ee55e00ff58c4abddc347f77c29d4aa Author: ALeksander Eskilson <alek.eskil...@cerner.com> Date: 2017-10-17T14:06:12Z adding state compaction commit e7046c3d3bb528f18b3183d81e8bc26720a8baf7 Author: ALeksander Eskilson <alek.eskil...@cerner.com> Date: 2017-10-17T16:54:54Z adding inline changes --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19480: [SPARK-22226][SQL] splitExpression can create too many m...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/19480 @mgaido91 It is possible to increase the heap allocated during testing if that seems like a desirable thing to do (I did so for #18075, but the current default is already 4GB), see [1] and [2] depending on which SQL module the test is in. [1] https://github.com/apache/spark/blob/master/sql/core/pom.xml#L198 [2] https://github.com/apache/spark/blob/master/sql/catalyst/pom.xml#L137 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19480: [SPARK-22226][SQL] splitExpression can create too...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/19480#discussion_r144564741 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -277,13 +292,25 @@ class CodegenContext { funcName: String, funcCode: String, inlineToOuterClass: Boolean = false): String = { +val newFunction = addNewFunctionInternal(funcName, funcCode, inlineToOuterClass) +newFunction match { + case NewFunction(functionName, None, None) => functionName + case NewFunction(functionName, Some(_), Some(subclassInstance)) => +subclassInstance + "." + functionName +} + } + + private[this] def addNewFunctionInternal( + funcName: String, + funcCode: String, + inlineToOuterClass: Boolean): NewFunction = { // The number of named constants that can exist in the class is limited by the Constant Pool // limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a -// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// threshold of 1000k bytes to determine when a function should be inlined to a private, nested // sub-class. val (className, classInstance) = if (inlineToOuterClass) { outerClassName -> "" -} else if (currClassSize > 160) { +} else if (currClassSize > 100) { --- End diff -- @gatorsmile it's a byte threshold, similar to the 1024 byte threshold set in `splitExpressions`. We can't know exactly how much code will contribute to the constant pool, that is, there's no easy static analysis we can perform on a block of code to say "this code will contribute `n` entries to the constant pool", we only know the size of the code is strongly correlated to entries in the constant pool. We're trying to keep the number of generated classes as low as possible while also grouping enough of the code to avoid the constant pool error. In short, I tested different types of schemas with many columns to find what the value could be set to empirically. There's no particular harm in setting the value lower as is done here if it helps us avoid a known constant pool error case. Doing so would effectively reduce the number of expressions each nested class holds, and so also increase the number of nested classes in total. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/16648 I'm blocking out time to prepare the part 2 PR for this issue starting today over this week, regarding compaction of excess primitive state. cc: @kiszk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/16648 Thanks @kiszk, I'll work on preparing a PR for the second half of this issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18377: [SPARK-18016][SQL][CATALYST][BRANCH-2.2] Code Generation...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18377 @cloud-fan @rxin @sameeragarwal What can we say is the status of these backports? Including this PR, #18354, and #18579, it was decided to revert the changes until the 2.2.0 release was finished. Will all the backports remain reverted? Or can we find a way to get these merged? It seems some people would find backports useful (see [SPARK-18016](https://issues.apache.org/jira/browse/SPARK-18016) jira discussion), but naturally I'd want to make sure the patches fit your testing requirements/release schedule. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18579: [SPARK-18016][SQL][followup] merge declareAddedFunctions...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18579 Thanks everyone! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18377: [SPARK-18016][SQL][CATALYST][BRANCH-2.2] Code Generation...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18377 @cloud-fan Sure! That makes perfect sense to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18377: [SPARK-18016][SQL][CATALYST][BRANCH-2.2] Code Generation...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18377 @sameeragarwal @cloud-fan If I understand well what you mean by affect/regress, this patch would induce no change at all on the code generated for any class below the 1600k limit. For such classes smaller than the limit, all functions are inlined to the [`OuterClass`](https://github.com/apache/spark/pull/18377/files#diff-8bcc5aea39c73d4bf38aef6f6951d42cR226), which the [`declareAddedFuctions`](https://github.com/apache/spark/pull/18377/files#diff-8bcc5aea39c73d4bf38aef6f6951d42cR320) method now references. Structurally, there would be no change to classes with code generated below the threshold. I don't know if this explanation suffices to address your concerns, but I'd say the patch is pretty passive in that code generation is only affected for cases over the 1600k threshold. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18354: [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18354 @dongjoon-hyun I've prepared the PR for the branch-2.2 backport, #18377. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18377: [SPARK-18016][SQL][CATALYST][BRANCH-2.2] Code Gen...
GitHub user bdrillard opened a pull request: https://github.com/apache/spark/pull/18377 [SPARK-18016][SQL][CATALYST][BRANCH-2.2] Code Generation: Constant Pool Limit - Class Splitting ## What changes were proposed in this pull request? This is a backport patch for Spark 2.2.x of the class splitting feature over excess generated code as was merged in #18075. ## How was this patch tested? The same test provided in #18075 is included in this patch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdrillard/spark class_splitting_2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18377.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18377 commit 850af2be606bcb54af3f1fc97eedf49c397fb024 Author: ALeksander Eskilson <alek.eskil...@cerner.com> Date: 2017-06-20T20:30:16Z class_splitting_2.2 removing instances of 'this' commit 8998ee5dc7119e7329d334d25de8f5d9b381aa9e Author: ALeksander Eskilson <alek.eskil...@cerner.com> Date: 2017-06-21T15:01:15Z class_splitting_2.2 adding class splitting functions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18354: [SPARK-18016][SQL][Catalyst] Backport of Class Splitting
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18354 @dongjoon-hyun Great, thanks for the heads up. I'll prepare a branch-2.2 backport PR and circle back to this PR with the link as confirmation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18354: [SPARK-18016][SQL][Catalyst] Backport of Class Splitting
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18354 @dongjoon-hyun I had not yet. I hadn't realized master was already on a 2.3.0-SNAPSHOT. I can also write a backport for branch-2.2. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18354: [SPARK-18016][SQL][Catalyst] Backport of Class Splitting
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18354 @maor121 Yeah, I should be able to backpatch to 2.0.x as well in a separate PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18354: Class splitting 2.1
GitHub user bdrillard opened a pull request: https://github.com/apache/spark/pull/18354 Class splitting 2.1 ## What changes were proposed in this pull request? This is a backport patch for Spark 2.1.x of the class splitting feature over excess generated code as was merged in #18075. ## How was this patch tested? The same test provided in #18075 is included in this patch. You can merge this pull request into a Git repository by running: $ git pull https://github.com/bdrillard/spark class_splitting_2.1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/18354.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #18354 commit 52b654aaad8e6435c9ca26e63224fb1b86b090db Author: ALeksander Eskilson <alek.eskil...@cerner.com> Date: 2017-06-19T21:25:33Z class_splitting_2.1 creating backpatch of class_splitting_only commit e2f8fd2ecfb4bd299776363f03660b3b9c9e10c2 Author: ALeksander Eskilson <alek.eskil...@cerner.com> Date: 2017-06-19T21:32:52Z class_splitting_2.1 adding doc for class_splitting functions --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r122017129 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala --- @@ -83,6 +83,58 @@ class GeneratedProjectionSuite extends SparkFunSuite { assert(result === row2) } + test("SPARK-18016: generated projections on wider table requiring class-splitting") { +val N = 4000 +val wideRow1 = new GenericInternalRow((1 to N).toArray[Any]) +val schema1 = StructType((1 to N).map(i => StructField("", IntegerType))) +val wideRow2 = new GenericInternalRow( + (1 to N).map(i => UTF8String.fromString(i.toString)).toArray[Any]) +val schema2 = StructType((1 to N).map(i => StructField("", StringType))) +val joined = new JoinedRow(wideRow1, wideRow2) +val joinedSchema = StructType(schema1 ++ schema2) +val nested = new JoinedRow(InternalRow(joined, joined), joined) +val nestedSchema = StructType( + Seq(StructField("", joinedSchema), StructField("", joinedSchema)) ++ joinedSchema) + +// test generated UnsafeProjection +val unsafeProj = UnsafeProjection.create(nestedSchema) +val unsafe: UnsafeRow = unsafeProj(nested) +(0 until N).foreach { i => + val s = UTF8String.fromString((i + 1).toString) --- End diff -- Fixed. See the above comment. Creating the data with `0 until N` cleans up the indexing on `i`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r122016978 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GeneratedProjectionSuite.scala --- @@ -83,6 +83,58 @@ class GeneratedProjectionSuite extends SparkFunSuite { assert(result === row2) } + test("SPARK-18016: generated projections on wider table requiring class-splitting") { +val N = 4000 +val wideRow1 = new GenericInternalRow((1 to N).toArray[Any]) --- End diff -- I've cleaned up the test you comment on, and the one above it, since they both have the same structure, just different values for N: https://github.com/apache/spark/pull/18075/commits/678b4ad770cbc891864f73d9b93c51b1ab79f6a5#diff-a14107cf4a4c41671bba24a82f6042d9R36 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r122016799 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // Nested maps holding function names and their code belonging to each class. + private val classFunctions: mutable.Map[String, mutable.Map[String, String]] = +mutable.Map("OuterClass" -> mutable.Map.empty[String, String]) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(className -> classInstance) +classSize += className -> 0 +classFunctions += className -> mutable.Map.empty[String, String] } + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This + * can be necessary when a function is declared outside of the context + * it is eventually referenced and a returned qualified function name + * cannot otherwise be accessed. + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( + funcName: String, + funcCode: String, + inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val (className, classInstance) = if (inlineToOuterClass) { + "OuterClass" -> "" +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} + +classSize(className) += funcCode.length +classFunctions(className) += funcName -> funcCode + +if (className.equals("OuterClass")) { + funcName +} else { + + s"$classInstance.$funcName" +} + } + + /** + * Instantiates all nested, private sub-classes as objects to the `OuterClass` + */ + private[sql] def initNestedClasses(): String = { +// Nested, private sub-classes have no mutable state (though they do reference the outer class' +// mutable state), so we declare and initialize them inline to the OuterClass. +classes.map { --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/678b4ad770cbc8918
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r122016650 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -629,7 +730,9 @@ class CodegenContext { /** * Splits the generated code of expressions into multiple functions, because function has - * 64kb code size limit in JVM + * 64kb code size limit in JVM. If the class the function is to be inlined to would grow beyond --- End diff -- I think this is the grammatically correct/hopefully more clear form of that same docstring: https://github.com/apache/spark/pull/18075/commits/678b4ad770cbc891864f73d9b93c51b1ab79f6a5#diff-8bcc5aea39c73d4bf38aef6f6951d42cR727 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r122016475 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/678b4ad770cbc891864f73d9b93c51b1ab79f6a5#diff-8bcc5aea39c73d4bf38aef6f6951d42cR225 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r122016536 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // Nested maps holding function names and their code belonging to each class. + private val classFunctions: mutable.Map[String, mutable.Map[String, String]] = +mutable.Map("OuterClass" -> mutable.Map.empty[String, String]) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(className -> classInstance) +classSize += className -> 0 +classFunctions += className -> mutable.Map.empty[String, String] } + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This + * can be necessary when a function is declared outside of the context + * it is eventually referenced and a returned qualified function name + * cannot otherwise be accessed. + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( + funcName: String, + funcCode: String, + inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val (className, classInstance) = if (inlineToOuterClass) { + "OuterClass" -> "" +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} + +classSize(className) += funcCode.length +classFunctions(className) += funcName -> funcCode + +if (className.equals("OuterClass")) { --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/678b4ad770cbc891864f73d9b93c51b1ab79f6a5#diff-8bcc5aea39c73d4bf38aef6f6951d42cR296 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r121989572 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala --- @@ -93,7 +93,7 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport { } val nextBatch = ctx.freshName("nextBatch") --- End diff -- I suppose it depends on which implementation we think is cleaner. The freshName generated by the caller is typically used twice, once in the call to `addNewFunction`, but also immediately in the function code as the method name. If we use a name hint, we'd have to do a string `replace` inside `addNewFunction` to update the placeholder method name with the freshname. So it would seem either we keep ``` val nextBatch = ctx.freshName("nextBatch") val nextBatchFuncName = ctx.addNewFunction(nextBatch, s""" |private void $nextBatch() throws java.io.IOException { | long getBatchStart = System.nanoTime(); | if ($input.hasNext()) { |$batch = ($columnarBatchClz)$input.next(); |$numOutputRows.add($batch.numRows()); |$idx = 0; |${columnAssigns.mkString("", "\n", "\n")} | } | $scanTimeTotalNs += System.nanoTime() - getBatchStart; |}""".stripMargin) ``` or we have ``` val nextBatchHint = "nextBatch" val nextBatch = ctx.addNewFunction(nextBatchHint, s""" |private void $nextBatchHint() throws java.io.IOException { | long getBatchStart = System.nanoTime(); | if ($input.hasNext()) { |$batch = ($columnarBatchClz)$input.next(); |$numOutputRows.add($batch.numRows()); |$idx = 0; |${columnAssigns.mkString("", "\n", "\n")} | } | $scanTimeTotalNs += System.nanoTime() - getBatchStart; |}""".stripMargin) ``` where `addNewFunction` would do the proper replacement over the code for the method with a freshname generated from "nextBatch" as a name hint. Or in every instance, we just duplicate the string hint without creating a variable for it in both the `addNewFunction` call and the method name: ``` val nextBatch = ctx.addNewFunction("nextBatch", s""" |private void nextBatch() throws java.io.IOException { | long getBatchStart = System.nanoTime(); | if ($input.hasNext()) { |$batch = ($columnarBatchClz)$input.next(); |$numOutputRows.add($batch.numRows()); |$idx = 0; |${columnAssigns.mkString("", "\n", "\n")} | } | $scanTimeTotalNs += System.nanoTime() - getBatchStart; |}""".stripMargin) ``` Which would you prefer? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r121975114 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // Nested maps holding function names and their code belonging to each class. + private val classFunctions: mutable.Map[String, mutable.Map[String, String]] = +mutable.Map("OuterClass" -> mutable.Map.empty[String, String]) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(className -> classInstance) +classSize += className -> 0 +classFunctions += className -> mutable.Map.empty[String, String] } + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This --- End diff -- Yes, see the portion of `doConsume` in the Limit class where the `stopEarly` function is registered, https://github.com/apache/spark/pull/18075/files#diff-379cccace8699ca00b76ff5631222adeR73 In this section of code, the registration of the function is separate from the caller code, so unlike other changes in this patch, we have no way of informing the caller code what the potentially class-qualified name of the function would be if it were inlined to a nested class. Instead, the caller code for the function, which is generated some point later, makes a hard assumption that `stopEarly` will be visible globally, that is, in the outer class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r121968338 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // Nested maps holding function names and their code belonging to each class. + private val classFunctions: mutable.Map[String, mutable.Map[String, String]] = --- End diff -- I had originally thought so, but it turns out that there's at least one instance where the code for a given function name is updated during the code-generation process. The generated [`stopEarly`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L75) function can actually be inserted twice, once returning a variable returning a different `stopEarly` variable each time. What would end up occurring is that two functions of the same signature would exist in the class, causing a compile error. So we need to use a map to make sure the implementation gets _updated_ for a given function when necessary. Note also that the old implementation of [`addedFunctions`](https://github.com/apache/spark/pull/18075/files#diff-8bcc5aea39c73d4bf38aef6f6951d42cL208) was a map also. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/16648 @cloud-fan Good question, and I think we can resolve it by using different values for `N` in the [testcase](https://github.com/apache/spark/pull/18075/files#diff-a14107cf4a4c41671bba24a82f6042d9R87) I have in the other PR (which will translate to a number of string columns deeper in the test). At `N = 4000`, we have a threshold where the amount of local state + global state would trigger a `JaninoRuntimeException` on the constant pool. #18075 can fix that issue at `N = 4000` by beginning to inline functions to nested classes, thus reducing the amount of items counting towards the constant pool, but we note that #18075 does nothing to address global state. We should also note that #18075 does slightly more than putting just member variables into nested classes. While it is true that a significant degree of local state alone that would get inlined to the Outer Class gets inlined instead to nested classes instead with #18075, the patch leads to even more reductions in the size of the constant pool, since there are additional items that get inlined to nested classes that also count towards the limit (e.g. field references, method references, variable types, method types, etc, see [Java class file, The Constant Pool](https://en.wikipedia.org/wiki/Java_class_file#The_constant_pool)). The second feature (included here, but not in #18075), is precisely as you describe: it takes simply declared fields that would be inlined globally and compacts them into like-typed and like-declared arrays. However, if we set `N = 8000` (even assuming the patch in #18075), we can trigger yet another `JaninoRuntimeException`, this time because the amount of global state (plus any local state that was inlined to the Outer Class and not any single subclass) is sufficiently great to cause the exception. However, if we include mutable state compaction and class splitting, like using the method included in this PR #16648, we can set `N` to a value greater than 10,000 (I had success for the test still at `N = 12000`, but at 16,000 my machine began to thrash). Conversely, if we only include mutable state compaction at `N = 8000`, and exclude class splitting, there are instances where we actually end up with very little global state, but the amount of local state and functions inlined to the Outer Class is still sufficient to exceed the constant pool limit. This can occur if we have a great number of primitive columns, like `N = 8000` integer columns. Looking at both #18075 and this pull-request, I think the takeaway is that even if all we do for the moment is split excess code among nested classes, we can still make a significant gain in the number of columns a Dataset can hold, which gives #18075 merit on its own. If we want to increase that limit even more though, we'll have to address proliferation of global state as well, perhaps by opening a follow-up PR that focuses on it more closely, maybe using the compaction strategy I've attempted here in #16648, or by exploring another method. Thoughts on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18075 The earlier failure occurred when the [`stopEarly()`](https://github.com/bdrillard/spark/blob/7fe5e4a84d4d8e71e2e63e6794e4ba13ac2e003f/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L73) function is registered to the `OuterClass` potentially twice. Using a [Map of functions](https://github.com/apache/spark/pull/18075/files#diff-8bcc5aea39c73d4bf38aef6f6951d42cR239) holding the function code _and_ its name fixes the issue, as whenever a function of the same name would be added more than once, it updates the older value. The tests pass after the change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/18075 @ueshin As for the remaining this in `objects.scala`, https://github.com/apache/spark/pull/18075/commits/493113ce2e1271039701be35b2603271282111df#diff-e436c96ea839dfe446837ab2a3531f93L984 and the need for an additional nested classes declaration in `GenerateColumnAccessor.scala`, https://github.com/apache/spark/pull/18075/commits/493113ce2e1271039701be35b2603271282111df#diff-58a69e526de8182bcb4c840a8cb29e2dR225 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r119180019 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala --- @@ -190,15 +190,15 @@ case class Stack(children: Seq[Expression]) extends Generator { if (index < values.length) values(index) else Literal(null, dataTypes(col)) } val eval = CreateStruct(fields).genCode(ctx) - s"${eval.code}\nthis.$rowData[$row] = ${eval.value};" + s"${eval.code}\n$rowData[$row] = ${eval.value};" }) // Create the collection. val wrapperClass = classOf[mutable.WrappedArray[_]].getName ctx.addMutableState( s"$wrapperClass", ev.value, - s"this.${ev.value} = $wrapperClass$$.MODULE$$.make(this.$rowData);") + s"this.${ev.value} = $wrapperClass$$.MODULE$$.make($rowData);") --- End diff -- Good catch, fixed: https://github.com/apache/spark/pull/18075/commits/493113ce2e1271039701be35b2603271282111df#diff-16493d6958b6daaf4a24dd7b780ba4bcL201 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r119179925 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,128 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map("OuterClass" -> mutable.ListBuffer.empty[String]) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This + * can be necessary when a function is declared outside of the context + * it is eventually referenced and a returned qualified function name + * cannot otherwise be accessed. + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + "OuterClass" -> "" +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions(name).append(funcCode) --- End diff -- I suppose that's more concise/readable given the underlying mutable data structures: https://github.com/apache/spark/pull/18075/commits/493113ce2e1271039701be35b2603271282111df#diff-8bcc5aea39c73d4bf38aef6f6951d42cL292 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r119179700 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,128 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map("OuterClass" -> mutable.ListBuffer.empty[String]) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This + * can be necessary when a function is declared outside of the context + * it is eventually referenced and a returned qualified function name + * cannot otherwise be accessed. + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { --- End diff -- Fixed (with some refactoring based on those variables being available in the scope earlier): https://github.com/apache/spark/pull/18075/commits/493113ce2e1271039701be35b2603271282111df#diff-8bcc5aea39c73d4bf38aef6f6951d42cL278 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r119179568 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,128 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map("OuterClass" -> mutable.ListBuffer.empty[String]) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This + * can be necessary when a function is declared outside of the context + * it is eventually referenced and a returned qualified function name + * cannot otherwise be accessed. + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/493113ce2e1271039701be35b2603271282111df#diff-8bcc5aea39c73d4bf38aef6f6951d42cL271 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r119179501 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +222,128 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)]("OuterClass" -> null) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int]("OuterClass" -> 0) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map("OuterClass" -> mutable.ListBuffer.empty[String]) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/493113ce2e1271039701be35b2603271282111df#diff-8bcc5aea39c73d4bf38aef6f6951d42cL250 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118497931 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +223,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)](("OuterClass", null)) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int](("OuterClass", 0)) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map(("OuterClass", mutable.ListBuffer.empty[String])) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + ("OuterClass", "") +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions.update(name, classFunctions(name) += funcCode) --- End diff -- Here's a commit with that change if you think it checks out: https://github.com/apache/spark/pull/18075/commits/c225f3ad3b5183be6c637633b0ebffc765be9532#diff-8bcc5aea39c73d4bf38aef6f6951d42cL290 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118345866 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +223,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)](("OuterClass", null)) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int](("OuterClass", 0)) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map(("OuterClass", mutable.ListBuffer.empty[String])) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + ("OuterClass", "") +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions.update(name, classFunctions(name) += funcCode) --- End diff -- Yeah, good point, since we're using a mutable buffer, we can update the referenced object directly even if its contained inside the map. Since `+=` explicitly returns the reference to the modified buffer, it would probably be most straightforward to use `classFunctions(name).append(funcCode)` since `append` has `Unit` return type, and we don't need any results from appending the code to the class's buffer of function code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118336805 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +223,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)](("OuterClass", null)) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int](("OuterClass", 0)) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map(("OuterClass", mutable.ListBuffer.empty[String])) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + ("OuterClass", "") +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions.update(name, classFunctions(name) += funcCode) --- End diff -- Ah, sure. The story seems pretty much the same, we still want the append operation that also returns the reference to the modified buffer, and that's given by `+=`. Also, it doesn't look like `+` is defined as an operation on a `ListBuffer[A]` and an element of type `A` (see [ListBuffer](https://www.scala-lang.org/api/2.11.8/index.html#scala.collection.mutable.ListBuffer)). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118323003 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +223,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)](("OuterClass", null)) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int](("OuterClass", 0)) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map(("OuterClass", mutable.ListBuffer.empty[String])) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + ("OuterClass", "") +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions.update(name, classFunctions(name) += funcCode) --- End diff -- `+=` will be necessary since `classFunctions(name)` will return the `ListBuffer[String]` containing all functions belonging to the given class name from the `classFunctions` map, and we want to append the new funcCode to that buffer. Also, `classFunctions.update` is going to expect a `ListBuffer[String]` as its second argument, but the return type of assignment `=` is just `Unit`, but `+=` will append the element and then return the modified buffer (which is the behavior we want). See the API doc for [ListBuffer](https://www.scala-lang.org/api/2.11.8/index.html#scala.collection.mutable.ListBuffer@+=(x:A):ListBuffer.this.type). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118302075 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -299,6 +297,9 @@ case class SampleExec( | } """.stripMargin.trim) + ctx.addMutableState(s"$samplerClass", sampler, --- End diff -- This change should stay the way it is. Notice that the initialization code for the `addMutableState` call on the line just below is the same code passed to the `addNewFunction` call. This means that when we create the new function, we may get back a class-qualified function name (which here we store in `initSamplerFuncName`), so the call to `addMutableState` must come after the `addNewFunction` call. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118302508 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +223,124 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)](("OuterClass", null)) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int](("OuterClass", 0)) + + // A map holding lists of functions belonging to their class. + private val classFunctions: mutable.Map[String, mutable.ListBuffer[String]] = +mutable.Map(("OuterClass", mutable.ListBuffer.empty[String])) + + // Returns the size of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.ListBuffer.empty[String] + } + + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + ("OuterClass", "") +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass() +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions.update(name, classFunctions(name) += funcCode) + +if (name.equals("OuterClass")) { + funcName +} else { + val classInstance = classInfo._2 + + s"$classInstance.$funcName" +} } + /** + * Instantiates all nested, private sub-classes as objects to the `OuterClass` + */ + private[sql] def initNestedClasses(): String = { +// Nested, private sub-classes have no mutable state (though they do reference the outer class' +// mutable state), so we declare and initialize them inline ot the OuterClass --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/78bccda7394db04f26ff2e8c14a2e2b5d8ea57cc#diff-8bcc5aea39c73d4bf38aef6f6951d42cL306 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118274669 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -629,7 +736,9 @@ class CodegenContext { /** * Splits the generated code of expressions into multiple functions, because function has - * 64kb code size limit in JVM + * 64kb code size limit in JVM. If the class the function is to be inlined to would grow beyond + * 1600kb, a private, netsted sub-class is declared, and the function is inlined to it, because --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/90a907acab6a074283908c0a3784af2c7d32cbce#diff-8bcc5aea39c73d4bf38aef6f6951d42cL740 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118274597 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -792,7 +887,18 @@ class CodegenContext { addMutableState(javaType(expr.dataType), value, s"$value = ${defaultValue(expr.dataType)};") - subexprFunctions += s"$fnName($INPUT_ROW);" + // Generate the code for this expression tree and wrap it in a function. --- End diff -- Fixed: https://github.com/apache/spark/pull/18075/commits/90a907acab6a074283908c0a3784af2c7d32cbce#diff-8bcc5aea39c73d4bf38aef6f6951d42cR872 Note: a rebase with the two commits squashed will show the `subexprFunctions` as the only changed line, which is what we want --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118274179 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +223,129 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)](("OuterClass", null)) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int](("OuterClass", 0)) + + // A map holding all functions and their names that will be inlined to a given class. + private val classFunctions: mutable.Map[String, mutable.Map[String, String]] = +mutable.Map(("OuterClass", mutable.Map.empty[String, String])) + + // Returns the sie of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.Map.empty[String, String] } + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + ("OuterClass", "") +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions.update(name, classFunctions(name) += funcName -> funcCode) + +if (name.equals("OuterClass")) { + funcName +} else { + val classInstance = classInfo._2 + + s"$classInstance.$funcName" +} + } + + /** + * Instantiates all nested, private sub-classes as objects to the `OuterClass` + */ + private[sql] def initNestedClasses(): String = { +// Nested, private sub-classes have no mutable state (though they do reference the outer class' +// mutable state), so we declare and initialize them inline ot the OuterClass +classes.map { + case (className, classInstance) => +if (className.equals("OuterClass")) { + "" +} else { + s"private $className $classInstance = new $className();" +} +}.mkString("\n") + } + + /** + * Declares all functions that should be inlined to the `OuterClass`
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118274094 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -233,10 +223,129 @@ class CodegenContext { // The collection of sub-expression result resetting methods that need to be called on each row. val subexprFunctions = mutable.ArrayBuffer.empty[String] - def declareAddedFunctions(): String = { -addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n") + /** + * Holds the class and instance names to be generated. `OuterClass` is a placeholder standing for + * whichever class is generated as the outermost class and which will contain any nested + * sub-classes. All other classes and instance names in this list will represent private, nested + * sub-classes. + */ + private val classes: mutable.ListBuffer[(String, String)] = +mutable.ListBuffer[(String, String)](("OuterClass", null)) + + // A map holding the current size in bytes of each class to be generated. + private val classSize: mutable.Map[String, Int] = +mutable.Map[String, Int](("OuterClass", 0)) + + // A map holding all functions and their names that will be inlined to a given class. + private val classFunctions: mutable.Map[String, mutable.Map[String, String]] = +mutable.Map(("OuterClass", mutable.Map.empty[String, String])) + + // Returns the sie of the most recently added class. + private def currClassSize(): Int = classSize(classes.head._1) + + // Returns the class name and instance name for the most recently added class. + private def currClass(): (String, String) = classes.head + + // Adds a new class. Requires the class' name, and its instance name. + private def addClass(className: String, classInstance: String): Unit = { +classes.prepend(Tuple2(className, classInstance)) +classSize += className -> 0 +classFunctions += className -> mutable.Map.empty[String, String] } + /** + * Adds a function to the generated class. If the code for the `OuterClass` grows too large, the + * function will be inlined into a new private, nested class, and a class-qualified name for the + * function will be returned. Otherwise, the function will be inined to the `OuterClass` the + * simple `funcName` will be returned. + * + * @param funcName the class-unqualified name of the function + * @param funcCode the body of the function + * @return the name of the function, qualified by class if it will be inlined to a private, + * nested sub-class + */ + def addNewFunction( +funcName: String, +funcCode: String, +inlineToOuterClass: Boolean = false): String = { +// The number of named constants that can exist in the class is limited by the Constant Pool +// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a +// threshold of 1600k bytes to determine when a function should be inlined to a private, nested +// sub-class. +val classInfo = if (inlineToOuterClass) { + ("OuterClass", "") +} else if (currClassSize > 160) { + val className = freshName("NestedClass") + val classInstance = freshName("nestedClassInstance") + + addClass(className, classInstance) + + className -> classInstance +} else { + currClass +} +val name = classInfo._1 + +classSize.update(name, classSize(name) + funcCode.length) +classFunctions.update(name, classFunctions(name) += funcName -> funcCode) + +if (name.equals("OuterClass")) { + funcName +} else { + val classInstance = classInfo._2 + + s"$classInstance.$funcName" +} + } + + /** + * Instantiates all nested, private sub-classes as objects to the `OuterClass` + */ + private[sql] def initNestedClasses(): String = { +// Nested, private sub-classes have no mutable state (though they do reference the outer class' +// mutable state), so we declare and initialize them inline ot the OuterClass +classes.map { + case (className, classInstance) => +if (className.equals("OuterClass")) { + "" +} else { + s"private $className $classInstance = new $className();" +} +}.mkString("\n") + } + + /** + * Declares all functions that should be inlined to the `OuterClass`
[GitHub] spark pull request #18075: [SPARK-18016][SQL][CATALYST] Code Generation: Con...
Github user bdrillard commented on a diff in the pull request: https://github.com/apache/spark/pull/18075#discussion_r118263506 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala --- @@ -792,7 +887,18 @@ class CodegenContext { addMutableState(javaType(expr.dataType), value, s"$value = ${defaultValue(expr.dataType)};") - subexprFunctions += s"$fnName($INPUT_ROW);" + // Generate the code for this expression tree and wrap it in a function. --- End diff -- In the original pull-request, since `addMutableState` returned values that were going to be referenced in `fn`, I had to declare the mutable state accessors before the code. It's less important here. The really important part for this pull-request is that the return value of `addNewFunction(fnName, fn)` is given added to `subexprFunctions`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #16648: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user bdrillard commented on the issue: https://github.com/apache/spark/pull/16648 I've created the first part of a pair of PRs to help make this review easier. Please see #18075 for a PR of the first feature (class splitting of excess code into nested sub-classes). If that PR is considered acceptable, we can address the second feature. attn: @kiszk --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org