[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21570#discussion_r195639885 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.execution.sort; + +import org.apache.spark.SparkConf; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.memory.TestMemoryConsumer; +import org.apache.spark.memory.TestMemoryManager; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.execution.RecordBinaryComparator; +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.UnsafeAlignedOffset; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.util.collection.unsafe.sort.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the RecordBinaryComparator, which compares two UnsafeRows by their binary form. + */ +public class RecordBinaryComparatorSuite { + + private final TaskMemoryManager memoryManager = new TaskMemoryManager( + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + private final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); + + private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + + private MemoryBlock dataPage; + private long pageCursor; + + private LongArray array; + private int pos; + + @Before + public void beforeEach() { +// Only compare between two input rows. +array = consumer.allocateArray(2); +pos = 0; + +dataPage = memoryManager.allocatePage(4096, consumer); +pageCursor = dataPage.getBaseOffset(); + } + + @After + public void afterEach() { +consumer.freePage(dataPage); +dataPage = null; +pageCursor = 0; + +consumer.freeArray(array); +array = null; +pos = 0; + } + + private void insertRow(UnsafeRow row) { +Object recordBase = row.getBaseObject(); +long recordOffset = row.getBaseOffset(); +int recordLength = row.getSizeInBytes(); + +Object baseObject = dataPage.getBaseObject(); +assert(pageCursor + recordLength <= dataPage.getBaseOffset() + dataPage.size()); +long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, pageCursor); +UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength); +pageCursor += uaoSize; +Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, recordLength); +pageCursor += recordLength; + +assert(pos < 2); +array.set(pos, recordAddress); +pos++; + } + + private int compare(int index1, int index2) { +Object baseObject = dataPage.getBaseObject(); + +long recordAddress1 = array.get(index1); +long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + uaoSize; +int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, baseOffset1 - uaoSize); + +long recordAddress2 = array.get(index2); +long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + uaoSize; +int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, baseOffset2 - uaoSize); + +return binaryComparator.compare(baseObject, baseOffset1, recordLength1, baseObject, +baseOffset2, recordLength2); + } + + private final RecordComparator binaryComparator = new RecordBinaryComparator(); + + // Compute the most compact size for UnsafeRow's backing data. + private int compute
[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21570#discussion_r195636719 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.execution.sort; + +import org.apache.spark.SparkConf; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.memory.TestMemoryConsumer; +import org.apache.spark.memory.TestMemoryManager; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.execution.RecordBinaryComparator; +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.UnsafeAlignedOffset; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.util.collection.unsafe.sort.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the RecordBinaryComparator, which compares two UnsafeRows by their binary form. + */ +public class RecordBinaryComparatorSuite { + + private final TaskMemoryManager memoryManager = new TaskMemoryManager( + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + private final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); + + private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + + private MemoryBlock dataPage; + private long pageCursor; + + private LongArray array; + private int pos; + + @Before + public void beforeEach() { +// Only compare between two input rows. +array = consumer.allocateArray(2); +pos = 0; + +dataPage = memoryManager.allocatePage(4096, consumer); +pageCursor = dataPage.getBaseOffset(); + } + + @After + public void afterEach() { +consumer.freePage(dataPage); +dataPage = null; +pageCursor = 0; + +consumer.freeArray(array); +array = null; +pos = 0; + } + + private void insertRow(UnsafeRow row) { +Object recordBase = row.getBaseObject(); +long recordOffset = row.getBaseOffset(); +int recordLength = row.getSizeInBytes(); + +Object baseObject = dataPage.getBaseObject(); +assert(pageCursor + recordLength <= dataPage.getBaseOffset() + dataPage.size()); +long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, pageCursor); +UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength); +pageCursor += uaoSize; +Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, recordLength); +pageCursor += recordLength; + +assert(pos < 2); +array.set(pos, recordAddress); +pos++; + } + + private int compare(int index1, int index2) { +Object baseObject = dataPage.getBaseObject(); + +long recordAddress1 = array.get(index1); +long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + uaoSize; +int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, baseOffset1 - uaoSize); + +long recordAddress2 = array.get(index2); +long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + uaoSize; +int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, baseOffset2 - uaoSize); + +return binaryComparator.compare(baseObject, baseOffset1, recordLength1, baseObject, +baseOffset2, recordLength2); + } + + private final RecordComparator binaryComparator = new RecordBinaryComparator(); + + // Compute the most compact size for UnsafeRow's backing data. + private int compute
[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21570#discussion_r195636548 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.execution.sort; + +import org.apache.spark.SparkConf; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.memory.TestMemoryConsumer; +import org.apache.spark.memory.TestMemoryManager; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.execution.RecordBinaryComparator; +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.UnsafeAlignedOffset; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.util.collection.unsafe.sort.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the RecordBinaryComparator, which compares two UnsafeRows by their binary form. + */ +public class RecordBinaryComparatorSuite { + + private final TaskMemoryManager memoryManager = new TaskMemoryManager( + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + private final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); + + private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + + private MemoryBlock dataPage; + private long pageCursor; + + private LongArray array; + private int pos; + + @Before + public void beforeEach() { +// Only compare between two input rows. +array = consumer.allocateArray(2); +pos = 0; + +dataPage = memoryManager.allocatePage(4096, consumer); +pageCursor = dataPage.getBaseOffset(); + } + + @After + public void afterEach() { +consumer.freePage(dataPage); +dataPage = null; +pageCursor = 0; + +consumer.freeArray(array); +array = null; +pos = 0; + } + + private void insertRow(UnsafeRow row) { +Object recordBase = row.getBaseObject(); +long recordOffset = row.getBaseOffset(); +int recordLength = row.getSizeInBytes(); + +Object baseObject = dataPage.getBaseObject(); +assert(pageCursor + recordLength <= dataPage.getBaseOffset() + dataPage.size()); +long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, pageCursor); +UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength); +pageCursor += uaoSize; +Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, recordLength); +pageCursor += recordLength; + +assert(pos < 2); +array.set(pos, recordAddress); +pos++; + } + + private int compare(int index1, int index2) { +Object baseObject = dataPage.getBaseObject(); + +long recordAddress1 = array.get(index1); +long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + uaoSize; +int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, baseOffset1 - uaoSize); + +long recordAddress2 = array.get(index2); +long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + uaoSize; +int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, baseOffset2 - uaoSize); + +return binaryComparator.compare(baseObject, baseOffset1, recordLength1, baseObject, +baseOffset2, recordLength2); + } + + private final RecordComparator binaryComparator = new RecordBinaryComparator(); + + // Compute the most compact size for UnsafeRow's backing data. + private int compute
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21570#discussion_r195634729 --- Diff: sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java --- @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.org.apache.spark.sql.execution.sort; + +import org.apache.spark.SparkConf; +import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.memory.TestMemoryConsumer; +import org.apache.spark.memory.TestMemoryManager; +import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData; +import org.apache.spark.sql.catalyst.expressions.UnsafeRow; +import org.apache.spark.sql.execution.RecordBinaryComparator; +import org.apache.spark.unsafe.Platform; +import org.apache.spark.unsafe.UnsafeAlignedOffset; +import org.apache.spark.unsafe.array.LongArray; +import org.apache.spark.unsafe.memory.MemoryBlock; +import org.apache.spark.unsafe.types.UTF8String; +import org.apache.spark.util.collection.unsafe.sort.*; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the RecordBinaryComparator, which compares two UnsafeRows by their binary form. + */ +public class RecordBinaryComparatorSuite { + + private final TaskMemoryManager memoryManager = new TaskMemoryManager( + new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); + private final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); + + private final int uaoSize = UnsafeAlignedOffset.getUaoSize(); + + private MemoryBlock dataPage; + private long pageCursor; + + private LongArray array; + private int pos; + + @Before + public void beforeEach() { +// Only compare between two input rows. +array = consumer.allocateArray(2); +pos = 0; + +dataPage = memoryManager.allocatePage(4096, consumer); +pageCursor = dataPage.getBaseOffset(); + } + + @After + public void afterEach() { +consumer.freePage(dataPage); +dataPage = null; +pageCursor = 0; + +consumer.freeArray(array); +array = null; +pos = 0; + } + + private void insertRow(UnsafeRow row) { +Object recordBase = row.getBaseObject(); +long recordOffset = row.getBaseOffset(); +int recordLength = row.getSizeInBytes(); + +Object baseObject = dataPage.getBaseObject(); +assert(pageCursor + recordLength <= dataPage.getBaseOffset() + dataPage.size()); +long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, pageCursor); +UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength); +pageCursor += uaoSize; +Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, recordLength); +pageCursor += recordLength; + +assert(pos < 2); +array.set(pos, recordAddress); +pos++; + } + + private int compare(int index1, int index2) { +Object baseObject = dataPage.getBaseObject(); + +long recordAddress1 = array.get(index1); +long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + uaoSize; +int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, baseOffset1 - uaoSize); + +long recordAddress2 = array.get(index2); +long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + uaoSize; +int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, baseOffset2 - uaoSize); + +return binaryComparator.compare(baseObject, baseOffset1, recordLength1, baseObject, +baseOffset2, recordLength2); + } + + private final RecordComparator binaryComparator = new RecordBinaryComparator(); + + // Compute the most compact size for UnsafeRow's backing data. + private int compute
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21537#discussion_r195356119 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -805,43 +811,43 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String private[this] def castToStringCode(from: DataType, ctx: CodegenContext): CastFunction = { from match { case BinaryType => -(c, evPrim, evNull) => s"$evPrim = UTF8String.fromBytes($c);" +(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);" case DateType => -(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString( +(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString( org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));""" case TimestampType => -val tz = ctx.addReferenceObj("timeZone", timeZone) -(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString( +val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) +(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString( org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));""" case ArrayType(et, _) => (c, evPrim, evNull) => { - val buffer = ctx.freshName("buffer") - val bufferClass = classOf[UTF8StringBuilder].getName + val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder]) + val bufferClass = JavaCode.className(classOf[UTF8StringBuilder]) --- End diff -- It is fine with me to address this in another PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21537#discussion_r195331403 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala --- @@ -805,43 +811,43 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String private[this] def castToStringCode(from: DataType, ctx: CodegenContext): CastFunction = { from match { case BinaryType => -(c, evPrim, evNull) => s"$evPrim = UTF8String.fromBytes($c);" +(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);" case DateType => -(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString( +(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString( org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));""" case TimestampType => -val tz = ctx.addReferenceObj("timeZone", timeZone) -(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString( +val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) +(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString( org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));""" case ArrayType(et, _) => (c, evPrim, evNull) => { - val buffer = ctx.freshName("buffer") - val bufferClass = classOf[UTF8StringBuilder].getName + val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder]) + val bufferClass = JavaCode.className(classOf[UTF8StringBuilder]) --- End diff -- Now, each variable defined by `freshVariable` has a type. We can get a type or its class name from the variable (e.g. `bufffer`). Therefore, it looks redundant to declare a name of each variable again (e.g. bufferClass). Do we have an API to get a type of the variable or define an API to get a name of the class? This is because this pattern is very common. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19222 ping @rednaxelafx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21542: [WIP][SPARK-24529][Build] Add spotbugs into maven...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/21542 [WIP][SPARK-24529][Build] Add spotbugs into maven build process ## What changes were proposed in this pull request? This PR enables a Java bytecode check tool [spotbugs](https://spotbugs.github.io/) to avoid possible integer overflow at multiplication. When a problem is detected, the build process is stopped. Due to the tool limitation, some other checks will be enabled. This check is enabled at `compile phase. Thus, `mvn compile` or `mvn package` launches this check. ## How was this patch tested? Existing UTs You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-24529 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21542.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21542 commit 3a356ada17a5cad00cb49fea20fb473a9e8392d1 Author: Kazuaki Ishizaki Date: 2018-06-12T17:42:48Z add spotbugs into pom.xml --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21505 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 When I check callers of `BufferHolder.grow()`, some of them call `ByteArrayMethods.roundNumberOfBytesToNearestWord()` and other do not call it (i.e. implicitly ensure word-aligned). Is it better way to call `ByteArrayMethods.roundNumberOfBytesToNearestWord()` at `BufferHolder.grow()` instread of a caller to gurantee word-aligned? Then, we can check whether `UnsafeRow.getSizeInBytes()` is a multiple of 8 in `BufferHolderSparkSubmitSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21481: [SPARK-24452][SQL][Core] Avoid possible overflow ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21481#discussion_r194561368 --- Diff: core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java --- @@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int klen, Object vbase, long voff // must be stored in the same memory page. // (8 byte key length) (key) (value) (8 byte pointer to next value) int uaoSize = UnsafeAlignedOffset.getUaoSize(); - final long recordLength = (2 * uaoSize) + klen + vlen + 8; + final long recordLength = (2L * uaoSize) + (long)klen + (long)vlen + 8L; --- End diff -- You are right. It was too conservative. ` (2L * uaoSize) + klen + vlen + 8` can generate `LMUL` or `LADD` as follows: ``` LDC 2 ILOAD 9 I2L LMUL ILOAD 4 I2L LADD ILOAD 8 I2L LADD LDC 8 LADD LSTORE 10 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 Sure, I will update this --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r194365155 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,23 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { --- End diff -- +1 for map approach --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r194299485 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,23 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { --- End diff -- Yes, it should work functionally if we check a given time zone every time. Do you know the typical access pattern of time zone? If there is temporal locality regarding time zone, we do not have to use `mutale.Map`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21520: [SPARK-24505][SQL] Forbidding string interpolation in Co...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21520 For 1, I agree with you that it is not good to introduce many APIs at first. On the other hand, it would be good to prepare only a few APIs that are frequently used, not to prepare many APIs. It make code simpler and cleaner. I think that `inline"${classOf[...].getName}"` and `inline"${CodeGenerator.javaType(...)}"` are frequently used. `inline"${CodeGenerator.boxedType(...)}"` may be prepared for consistency with `inline"${CodeGenerator.javaType(...)}"`. For 2, new APIs in `ctx` can co-exist with old APIs. Thus, to introduce new APIs can co-exist with your new proposal. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21481 Thank you for your comment. I will create another PR for integrating findBugs/SpotBugs into maven. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r194268734 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,23 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, Calendar]] { --- End diff -- Usually, only the default time zone is used. To execute `Cast` regarding date is called with a timezone may use another timezone. For the correctness, I think that it is necessary to support multiple timezones. To enable caching for default time zone and to create an instance for other time zones would also work correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21481 Since I found an plug-in for maven, I will also include a patch to add findBugs/SpotBugs into maven in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21510 LGTM with one minor comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler i...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21510#discussion_r194266907 --- Diff: core/src/main/scala/org/apache/spark/ui/WebUI.scala --- @@ -101,12 +101,12 @@ private[spark] abstract class WebUI( } /** - * Add a handler for static content. + * Adds a handler for static content. --- End diff -- In this file, `s` is not added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21520: [SPARK-24505][SQL] Forbidding string interpolation in Co...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21520 Thank you for a lot of works to update many places. It is very hard to split it into several pieces. Now, we are seeing several typical patterns in the all of changes, in paticular by wrapping the original code. Would it be possible to make changes simpler by introducing several APIs? 1) We are seeing many `inline` prefix with a few typical patterns. ``` inline"${classOf[...].getName}" inline"${CodeGenerator.javaType(...)}" inline"${CodeGenerator.boxedType(...)}" ``` Can we introduce new APIs to avoid repetations of adding `inline`, for example `JavaCode.className(Class[_]): JavaCode` for the first call. 2) We are seeing many `JavaCode.global()` or `JavaCode.variable()` when we create a new variable. Would it be possible to make them simpler? For example, we may introduce these APIs. ``` ctx.addMutableState(..., Class[_]) ctx.freshName(..., DataType) ctx.freshNameIsNull(...) ``` The first one calls `JavaCode.global()` in the method. The second one calls `JavaCode.variable()`. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21481 Since it is Java bytecode analysis, it is available for Scala code, too. In my quick test, findBugs overlooked a possible overflow. On the other hand, findBugs found another redundant null check. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21258 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21481 @JoshRosen @cloud-fan Here is an update. I have just apply `findBugs` to `OffHeapColumnVector.java` and `UnsafeArrayData.java`. In `OffHeapColumnVector.java`, most of possible overflows are detected. But, not all. In `UnsafeArrayData.java`, two possible overflows are detected. Line 86 and 456. I overlooked Line 86. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21258 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r193942183 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -236,6 +236,76 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class MapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = right.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] +val valueArrayData = valueArray.asInstanceOf[ArrayData] +if (keyArrayData.numElements != valueArrayData.numElements) { + throw new RuntimeException("The given two arrays should have the same length") +} +val leftArrayType = left.dataType.asInstanceOf[ArrayType] +if (leftArrayType.containsNull) { + var i = 0 + while (i < keyArrayData.numElements) { +if (keyArrayData.isNullAt(i)) { + throw new RuntimeException("Cannot use null as map key!") +} +i += 1 + } +} +new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy()) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => { + val arrayBasedMapData = classOf[ArrayBasedMapData].getName + val leftArrayType = left.dataType.asInstanceOf[ArrayType] + val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else { +val i = ctx.freshName("i") +s""" + |for (int $i = 0; $i < $keyArrayData.numElements(); $i++) { + | if ($keyArrayData.isNullAt($i)) { + |throw new RuntimeException("Cannot use null as map key!"); + | } + |} + """.stripMargin + } + s""" + |if ($keyArrayData.numElements() != $valueArrayData.numElements()) { + | throw new RuntimeException("The given two arrays should have the same length"); + |} + |$keyArrayElemNullCheck + |${ev.value} = new $arrayBasedMapData($keyArrayData.copy(), $valueArrayData.copy()); + """.stripMargin +}) + } + + override def prettyName: String = "create_map_from_arrays" --- End diff -- Oh, good catch --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r193928013 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { --- End diff -- sure, done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r193927995 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, _), ArrayType(_, _)) => +TypeCheckResult.TypeCheckSuccess + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = right.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = left.nullable || right.nullable --- End diff -- good catch, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r193927508 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, _), ArrayType(_, _)) => +TypeCheckResult.TypeCheckSuccess + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = right.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = left.nullable || right.nullable + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] +val valueArrayData = valueArray.asInstanceOf[ArrayData] +if (keyArrayData.numElements != valueArrayData.numElements) { + throw new RuntimeException("The given two arrays should have the same length") +} +val leftArrayType = left.dataType.asInstanceOf[ArrayType] +if (leftArrayType.containsNull) { + if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) { +throw new RuntimeException("Cannot use null as map key!") + } +} +new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy()) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => { + val arrayBasedMapData = classOf[ArrayBasedMapData].getName + val leftArrayType = left.dataType.asInstanceOf[ArrayType] + val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else { +val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", leftArrayType.elementType) +val array = ctx.freshName("array") +val i = ctx.freshName("i") +s""" + |Object[] $array = $keyArrayData.toObjectArray($leftArrayTypeTerm); + |for (int $i = 0; $i < $array.length; $i++) { + | if ($array[$i] == null) { + |throw new RuntimeException("Cannot use null as map key!"); + | } + |} --- End diff -- Got it. An array has been evaluated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21507: Branch 1.6
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21507 @deepaksonu Would it be possible to close this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21109#discussion_r193762830 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala --- @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.util.ConcurrentModificationException + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.serializer.SerializerManager +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer +import org.apache.spark.storage.BlockManager + +/** + * An append-only array for [[UnsafeRow]]s that strictly keeps content in an in-memory array + * until [[numRowsInMemoryBufferThreshold]] is reached post which it will switch to a mode which + * would flush to disk after [[numRowsSpillThreshold]] is met (or before if there is + * excessive memory consumption). Setting these threshold involves following trade-offs: + * + * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory array may occupy more memory + * than is available, resulting in OOM. + * - If [[numRowsSpillThreshold]] is too low, data will be spilled frequently and lead to + * excessive disk writes. This may lead to a performance regression compared to the normal case + * of using an [[ArrayBuffer]] or [[Array]]. + */ +private[sql] class InMemoryUnsafeRowQueue( +taskMemoryManager: TaskMemoryManager, +blockManager: BlockManager, +serializerManager: SerializerManager, +taskContext: TaskContext, +initialSize: Int, +pageSizeBytes: Long, +numRowsInMemoryBufferThreshold: Int, +numRowsSpillThreshold: Int) + extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager, + blockManager, + serializerManager, + taskContext, + initialSize, + pageSizeBytes, + numRowsInMemoryBufferThreshold, + numRowsSpillThreshold) { + + def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: Int) { +this( + TaskContext.get().taskMemoryManager(), + SparkEnv.get.blockManager, + SparkEnv.get.serializerManager, + TaskContext.get(), + 1024, + SparkEnv.get.memoryManager.pageSizeBytes, + numRowsInMemoryBufferThreshold, + numRowsSpillThreshold) + } + + private val initialSizeOfInMemoryBuffer = +Math.min(DefaultInitialSizeOfInMemoryBuffer, numRowsInMemoryBufferThreshold) + + private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) { +new mutable.Queue[UnsafeRow]() + } else { +null + } + +// private var spillableArray: UnsafeExternalSorter = _ --- End diff -- nit: Is this comment necessary? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21505 We would appreciate it if you put the performance before and after this PR? It would be good to use `Benchmark` class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21061 Let me think about the implementation to keep the order. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21505#discussion_r193678440 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala --- @@ -111,6 +113,24 @@ object DateTimeUtils { computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone) } + private val threadLocalComputedCalendarsMap = +new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] { + override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] = { +mutable.Map[TimeZone, (Calendar, Long)]() + } +} + + def getCalendar(timeZone: TimeZone): Calendar = { +val (c, timeInMillis) = threadLocalComputedCalendarsMap.get() + .getOrElseUpdate(timeZone, { +val c = Calendar.getInstance(timeZone) +(c, c.getTimeInMillis) + }) +c.clear() +c.setTimeInMillis(timeInMillis) --- End diff -- I agree with @viirya 's comment. Do we need to set the value of `System.currentTimeMillis()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r192949349 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, _), ArrayType(_, _)) => +TypeCheckResult.TypeCheckSuccess + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = right.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = left.nullable || right.nullable + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] +val valueArrayData = valueArray.asInstanceOf[ArrayData] +if (keyArrayData.numElements != valueArrayData.numElements) { + throw new RuntimeException("The given two arrays should have the same length") +} +val leftArrayType = left.dataType.asInstanceOf[ArrayType] +if (leftArrayType.containsNull) { + if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) { +throw new RuntimeException("Cannot use null as map key!") + } +} +new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy()) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => { + val arrayBasedMapData = classOf[ArrayBasedMapData].getName + val leftArrayType = left.dataType.asInstanceOf[ArrayType] + val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else { +val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", leftArrayType.elementType) +val array = ctx.freshName("array") +val i = ctx.freshName("i") +s""" + |Object[] $array = $keyArrayData.toObjectArray($leftArrayTypeTerm); + |for (int $i = 0; $i < $array.length; $i++) { + | if ($array[$i] == null) { + |throw new RuntimeException("Cannot use null as map key!"); + | } + |} --- End diff -- This code should work if we evaluate each element to make `isNullAt()` valid. I think that my mistake is not to currently evaluate each element in `keyArrayData` and `valueArrayData.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21481 Good questions. For 2, at first I found one of these issues when I looked at a file. Then, I ran `grep` command with `long .*=.*\*` and `long .*=.*\+` in `.java` file. Then, I picked them up manually. It looks labor-intensive. For 3, here is my thought. [`SpotBugs`](https://spotbugs.github.io/) may be a good candidate to check it. SpotBug is a successor of [`findBugs`](https://findbugs.sourceforge.net/). When I ran `FindBugs` before, I found some problems regarding possible overflow and then made a PR that was integrated. On the other hand, these issues may not be detected at that time. I will look at SpotBugs after my presentation at SAIS will be finished :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21481 cc @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r192551411 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, _), ArrayType(_, _)) => +TypeCheckResult.TypeCheckSuccess + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = right.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = left.nullable || right.nullable + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] +val valueArrayData = valueArray.asInstanceOf[ArrayData] +if (keyArrayData.numElements != valueArrayData.numElements) { + throw new RuntimeException("The given two arrays should have the same length") +} +val leftArrayType = left.dataType.asInstanceOf[ArrayType] +if (leftArrayType.containsNull) { + if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) { +throw new RuntimeException("Cannot use null as map key!") + } +} +new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy()) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => { + val arrayBasedMapData = classOf[ArrayBasedMapData].getName + val leftArrayType = left.dataType.asInstanceOf[ArrayType] + val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else { +val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", leftArrayType.elementType) +val array = ctx.freshName("array") +val i = ctx.freshName("i") +s""" + |Object[] $array = $keyArrayData.toObjectArray($leftArrayTypeTerm); + |for (int $i = 0; $i < $array.length; $i++) { + | if ($array[$i] == null) { + |throw new RuntimeException("Cannot use null as map key!"); + | } + |} --- End diff -- However, I realized we have to evaluate each element as `CreateMap` does. I think that we have to update eval and codegen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r192548230 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArrays(left: Expression, right: Expression) +extends BinaryExpression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, ArrayType) + + override def checkInputDataTypes(): TypeCheckResult = { +(left.dataType, right.dataType) match { + case (ArrayType(_, _), ArrayType(_, _)) => +TypeCheckResult.TypeCheckSuccess + case _ => +TypeCheckResult.TypeCheckFailure("The given two arguments should be an array") +} + } + + override def dataType: DataType = { +MapType( + keyType = left.dataType.asInstanceOf[ArrayType].elementType, + valueType = right.dataType.asInstanceOf[ArrayType].elementType, + valueContainsNull = right.dataType.asInstanceOf[ArrayType].containsNull) + } + + override def nullable: Boolean = left.nullable || right.nullable + + override def nullSafeEval(keyArray: Any, valueArray: Any): Any = { +val keyArrayData = keyArray.asInstanceOf[ArrayData] +val valueArrayData = valueArray.asInstanceOf[ArrayData] +if (keyArrayData.numElements != valueArrayData.numElements) { + throw new RuntimeException("The given two arrays should have the same length") +} +val leftArrayType = left.dataType.asInstanceOf[ArrayType] +if (leftArrayType.containsNull) { + if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) { +throw new RuntimeException("Cannot use null as map key!") + } +} +new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy()) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => { + val arrayBasedMapData = classOf[ArrayBasedMapData].getName + val leftArrayType = left.dataType.asInstanceOf[ArrayType] + val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else { +val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", leftArrayType.elementType) +val array = ctx.freshName("array") +val i = ctx.freshName("i") +s""" + |Object[] $array = $keyArrayData.toObjectArray($leftArrayTypeTerm); + |for (int $i = 0; $i < $array.length; $i++) { + | if ($array[$i] == null) { + |throw new RuntimeException("Cannot use null as map key!"); + | } + |} --- End diff -- good catch, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r192548103 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala --- @@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) extends Expression { override def prettyName: String = "map" } +/** + * Returns a catalyst Map containing the two arrays in children expressions as keys and values. + */ +@ExpressionDescription( + usage = """ +_FUNC_(keys, values) - Creates a map with a pair of the given key/value arrays. All elements + in keys should not be null""", + examples = """ +Examples: + > SELECT _FUNC_([1.0, 3.0], ['2', '4']); + {1.0:"2",3.0:"4"} + """, since = "2.4.0") +case class CreateMapFromArrays(left: Expression, right: Expression) --- End diff -- In existing convention, `"map" -> "CreateMap"`. How about `"map_from_arrays" -> ???`? I am neutral on `MapFromArrays` or `CreateMapFromArrays`. WDYT? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21258#discussion_r192547906 --- Diff: python/pyspark/sql/functions.py --- @@ -1798,6 +1798,22 @@ def create_map(*cols): return Column(jc) +@ignore_unicode_prefix +@since(2.4) +def create_map_from_arrays(col1, col2): --- End diff -- Sure --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192546226 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -2189,3 +2189,302 @@ case class ArrayRemove(left: Expression, right: Expression) override def prettyName: String = "array_remove" } + +object ArraySetLike { + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +if (useGenericArrayData(LongType.defaultSize, array.length)) { + new GenericArrayData(array) +} else { + UnsafeArrayData.fromPrimitiveArray(array) +} + } + + def useGenericArrayData(elementSize: Int, length: Int): Boolean = { --- End diff -- Although I tried it, I stopped reusing. This is because `UnsafeArrayData.fromPrimitiveArray()` also uses variables (e.g. `headerInBytes` and `valueRegionInBytes`) calculated in this method. I think that there is no typical way to return multiple values from a function. Thus, we can move this to `UnsafeArrayData`. But, it is not easy to reuse it. WDYT? ``` private static UnsafeArrayData fromPrimitiveArray( Object arr, int offset, int length, int elementSize) { final long headerInBytes = calculateHeaderPortionInBytes(length); final long valueRegionInBytes = elementSize * length; final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) / 8; if (totalSizeInLongs > Integer.MAX_VALUE / 8) { throw new UnsupportedOperationException("Cannot convert this array to unsafe format as " + "it's too big."); } final long[] data = new long[(int)totalSizeInLongs]; Platform.putLong(data, Platform.LONG_ARRAY_OFFSET, length); Platform.copyMemory(arr, offset, data, Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes); UnsafeArrayData result = new UnsafeArrayData(); result.pointTo(data, Platform.LONG_ARRAY_OFFSET, (int)totalSizeInLongs * 8); return result; } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21481 cc @ueshin @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21481: [SPARK-24452][SQL][Core] Avoid possible overflow ...
GitHub user kiszk opened a pull request: https://github.com/apache/spark/pull/21481 [SPARK-24452][SQL][Core] Avoid possible overflow in int add or multiple ## What changes were proposed in this pull request? This PR fixes possible overflow in int add or multiply. The following assignments may cause overflow in right hand side. As a result, the result may be negative. ``` long = int * int long = int + int ``` To avoid this problem, this PR performs cast from int to long in right hand side. ## How was this patch tested? Existing UTs. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kiszk/spark SPARK-24452 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21481.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21481 commit 324fd5ccb73c8017f5537031db21b687ac1ca27a Author: Kazuaki Ishizaki Date: 2018-06-01T20:22:34Z initial commit --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192490355 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- Ah, I misunderstood. To accept `Integer.MAX_VALUE * 8` looks a future plan. Anyway, I will use the same calculation in `UnsafeArrayData.fromPrimitiveArray()`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192340073 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient private lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): Open
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192336364 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def arrayUnion( + array1: ArrayData, + array2: ArrayData, + et: DataType, + ordering: Ordering[Any]): ArrayData = { +if (ordering == null) { + new GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et)) +.distinct.asInstanceOf[Array[Any]]) +} else { + val length = math.min(array1.numElements().toLong + array2.numElements().toLong, +ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) + val array = new Array[Any](length.toInt) + var pos = 0 + var hasNull = false + Seq(array1, array2).foreach(_.foreach(et, (_, v) => { +var found = false +if (v == null) { + if (hasNull) { +found = true + } else { +hasNull = true + } +} else { + var j = 0 + while (!found && j < pos) { +val va = array(j) +if (va != null && ordering.equiv(va, v)) { + found = true +} +j = j + 1 + } +} +if (!found) { + if (pos > MAX_ARRAY_LENGTH) { +throw new RuntimeException(s"Unsuccessful try to union arrays with $pos" + + s" elements due to exceeding the array size limit $MAX_ARRAY_LENGTH.") + } + array(pos) = v + pos = pos + 1 +} + })) + new GenericArrayData(array.slice(0, pos)) +} + } +} + +abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast { + def typeId: Int + + override def dataType: DataType = left.dataType + + override def checkInputDataTypes(): TypeCheckResult = { +val typeCheckResult = super.checkInputDataTypes() +if (typeCheckResult.isSuccess) { + TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType, +s"function $prettyName") +} else { + typeCheckResult +} + } + + private def cn = left.dataType.asInstanceOf[ArrayType].containsNull || +right.dataType.asInstanceOf[ArrayType].containsNull + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(elementType) + + @transient private lazy val elementTypeSupportEquals = elementType match { +case BinaryType => false +case _: AtomicType => true +case _ => false + } + + def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): Open
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192331296 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- As I wrote a comment, since `UnsafeArrayData.fromPrimitiveArray()` uses `long[]`, this method can accept up to `Integer.MAX_VALUE * 8` (8 means `sizeof(long)`) as total byte size. Of course, conservatively, we limit the length by up to `Integer.MAX_VALUE`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21061#discussion_r192330635 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +object ArraySetLike { + val kindUnion = 1 + + private val MAX_ARRAY_LENGTH: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH + + def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = { +val array = new Array[Int](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 4L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { + UnsafeArrayData.fromPrimitiveArray(array) +} else { + new GenericArrayData(array) +} + } + + def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = { +val array = new Array[Long](hs.size) +var pos = hs.nextPos(0) +var i = 0 +while (pos != OpenHashSet.INVALID_POS) { + array(i) = hs.getValue(pos) + pos = hs.nextPos(pos + 1) + i += 1 +} + +val numBytes = 8L * array.length +val unsafeArraySizeInBytes = UnsafeArrayData.calculateHeaderPortionInBytes(array.length) + + org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) +// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max elements * 8 bytes can be used +if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) { --- End diff -- `8` means of `sizeof(long)` in Java primitive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21061 ping @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21443: [SPARK-24369][SQL] Correct handling for multiple distinc...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21443 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r191260626 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -127,6 +127,165 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the + N-th value of each array given.""", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + """, + since = "2.4.0") +case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType) + + override def dataType: DataType = ArrayType(mountSchema) + + override def nullable: Boolean = children.forall(_.nullable) + + private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType]) + + private lazy val arrayElementTypes = arrayTypes.map(_.elementType) + + def mountSchema: StructType = { +val fields = children.zip(arrayElementTypes).zipWithIndex.map { + case ((expr: NamedExpression, elementType), _) => +StructField(expr.name, elementType, nullable = true) + case ((_, elementType), idx) => +StructField(s"$idx", elementType, nullable = true) +} +StructType(fields) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val numberOfArrays: Int = children.length +val genericArrayData = classOf[GenericArrayData].getName +val genericInternalRow = classOf[GenericInternalRow].getName +val arrVals = ctx.freshName("arrVals") +val arrCardinality = ctx.freshName("arrCardinality") +val biggestCardinality = ctx.freshName("biggestCardinality") +val storedArrTypes = ctx.freshName("storedArrTypes") +val returnNull = ctx.freshName("returnNull") +val evals = children.map(_.genCode(ctx)) + +val inputs = evals.zipWithIndex.map { case (eval, index) => + s""" +|${eval.code} +|if (!${eval.isNull}) { +| $arrVals[$index] = ${eval.value}; +| $arrCardinality[$index] = ${eval.value}.numElements(); +|} else { +| $arrVals[$index] = null; +| $arrCardinality[$index] = 0; +| $returnNull[0] = true; +|} +|$storedArrTypes[$index] = "${arrayElementTypes(index)}"; --- End diff -- In simple cases, since we know only a data type of all children before execution, we may not need to use `$storedArrTypes`. However, I may miss something. Would it be possible to show an example test case that requires to pick the correct `getValue` by using `$storedArrTypes`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r191132972 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -127,6 +127,176 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the + N-th value of each array given.""", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + """, + since = "2.4.0") +case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType) + + override def dataType: DataType = ArrayType(mountSchema) + + override def nullable: Boolean = children.forall(_.nullable) + + private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType]) + + private lazy val arrayElementTypes = arrayTypes.map(_.elementType) + + + def mountSchema: StructType = { +val fields = arrayTypes.zipWithIndex.map { case (arr, idx) => + val fieldName = if (children(idx).isInstanceOf[NamedExpression]) { + children(idx).asInstanceOf[NamedExpression].name +} else { + s"$idx" +} + StructField(fieldName, arr.elementType, children(idx).nullable || arr.containsNull) +} +StructType(fields) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val numberOfArrays: Int = children.length +val genericArrayData = classOf[GenericArrayData].getName +val genericInternalRow = classOf[GenericInternalRow].getName +val arrVals = ctx.freshName("arrVals") +val arrCardinality = ctx.freshName("arrCardinality") +val biggestCardinality = ctx.freshName("biggestCardinality") +val storedArrTypes = ctx.freshName("storedArrTypes") +val returnNull = ctx.freshName("returnNull") +val evals = children.map(_.genCode(ctx)) + +val inputs = evals.zipWithIndex.map { case (eval, index) => + s""" +|${eval.code} +|if (!${eval.isNull}) { +| $arrVals[$index] = ${eval.value}; +| $arrCardinality[$index] = ${eval.value}.numElements(); +|} else { +| $arrVals[$index] = null; +| $arrCardinality[$index] = 0; +| $returnNull[0] = true; +|} +|$storedArrTypes[$index] = "${arrayElementTypes(index)}"; +|$biggestCardinality = Math.max($biggestCardinality, $arrCardinality[$index]); + """.stripMargin +} + +val inputsSplitted = ctx.splitExpressions( + expressions = inputs, + funcName = "getInputAndCardinality", + returnType = "int", + makeSplitFunction = body => +s""" + |$body + |return $biggestCardinality; +""".stripMargin, + foldFunctions = _.map(funcCall => s"$biggestCardinality = $funcCall;").mkString("\n"), + arguments = +("ArrayData[]", arrVals) :: +("int[]", arrCardinality) :: +("String[]", storedArrTypes) :: +("int", biggestCardinality) :: +("boolean[]", returnNull) :: Nil) + +val myobject = ctx.freshName("myobject") +val j = ctx.freshName("j") +val i = ctx.freshName("i") +val args = ctx.freshName("args") + +val cases = arrayElementTypes.distinct.map { elementType => + val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", elementType, i) + s""" +|case "${elementType}": +| $myobject[$j] = $getArrValsItem; +| break; + """.stripMargin +} + +ev.copy(s""" + |ArrayData[] $arrVals = new ArrayData[$numberOfArrays]; + |int[] $arrCardinality = new int[$numberOfArrays]; + |int $biggestCardinality = 0; + |String[] $storedArrTypes = new String[$numberOfArrays]; + |boolean[] $returnNull = new boolean[1]; + |$return
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r191132426 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -127,6 +127,176 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the + N-th value of each array given.""", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + """, + since = "2.4.0") +case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType) + + override def dataType: DataType = ArrayType(mountSchema) + + override def nullable: Boolean = children.forall(_.nullable) + + private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType]) + + private lazy val arrayElementTypes = arrayTypes.map(_.elementType) + + + def mountSchema: StructType = { +val fields = arrayTypes.zipWithIndex.map { case (arr, idx) => + val fieldName = if (children(idx).isInstanceOf[NamedExpression]) { + children(idx).asInstanceOf[NamedExpression].name +} else { + s"$idx" +} + StructField(fieldName, arr.elementType, children(idx).nullable || arr.containsNull) +} +StructType(fields) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val numberOfArrays: Int = children.length +val genericArrayData = classOf[GenericArrayData].getName +val genericInternalRow = classOf[GenericInternalRow].getName +val arrVals = ctx.freshName("arrVals") +val arrCardinality = ctx.freshName("arrCardinality") +val biggestCardinality = ctx.freshName("biggestCardinality") +val storedArrTypes = ctx.freshName("storedArrTypes") +val returnNull = ctx.freshName("returnNull") +val evals = children.map(_.genCode(ctx)) + +val inputs = evals.zipWithIndex.map { case (eval, index) => + s""" +|${eval.code} +|if (!${eval.isNull}) { +| $arrVals[$index] = ${eval.value}; +| $arrCardinality[$index] = ${eval.value}.numElements(); +|} else { +| $arrVals[$index] = null; +| $arrCardinality[$index] = 0; +| $returnNull[0] = true; +|} +|$storedArrTypes[$index] = "${arrayElementTypes(index)}"; +|$biggestCardinality = Math.max($biggestCardinality, $arrCardinality[$index]); + """.stripMargin +} + +val inputsSplitted = ctx.splitExpressions( + expressions = inputs, + funcName = "getInputAndCardinality", + returnType = "int", + makeSplitFunction = body => +s""" + |$body + |return $biggestCardinality; +""".stripMargin, + foldFunctions = _.map(funcCall => s"$biggestCardinality = $funcCall;").mkString("\n"), + arguments = +("ArrayData[]", arrVals) :: +("int[]", arrCardinality) :: +("String[]", storedArrTypes) :: +("int", biggestCardinality) :: +("boolean[]", returnNull) :: Nil) + +val myobject = ctx.freshName("myobject") +val j = ctx.freshName("j") +val i = ctx.freshName("i") +val args = ctx.freshName("args") + +val cases = arrayElementTypes.distinct.map { elementType => + val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", elementType, i) + s""" +|case "${elementType}": +| $myobject[$j] = $getArrValsItem; +| break; + """.stripMargin +} + +ev.copy(s""" + |ArrayData[] $arrVals = new ArrayData[$numberOfArrays]; + |int[] $arrCardinality = new int[$numberOfArrays]; + |int $biggestCardinality = 0; + |String[] $storedArrTypes = new String[$numberOfArrays]; + |boolean[] $returnNull = new boolean[1]; + |$return
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r191131809 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -127,6 +127,165 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the + N-th value of each array given.""", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + """, + since = "2.4.0") +case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType) + + override def dataType: DataType = ArrayType(mountSchema) + + override def nullable: Boolean = children.forall(_.nullable) + + private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType]) + + private lazy val arrayElementTypes = arrayTypes.map(_.elementType) --- End diff -- Can we have more than one `arrayElementTypes`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r191126615 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -127,6 +127,165 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the + N-th value of each array given.""", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + """, + since = "2.4.0") +case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType) + + override def dataType: DataType = ArrayType(mountSchema) + + override def nullable: Boolean = children.forall(_.nullable) + + private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType]) + + private lazy val arrayElementTypes = arrayTypes.map(_.elementType) + + def mountSchema: StructType = { +val fields = children.zip(arrayElementTypes).zipWithIndex.map { + case ((expr: NamedExpression, elementType), _) => +StructField(expr.name, elementType, nullable = true) + case ((_, elementType), idx) => +StructField(s"$idx", elementType, nullable = true) +} +StructType(fields) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val numberOfArrays: Int = children.length +val genericArrayData = classOf[GenericArrayData].getName +val genericInternalRow = classOf[GenericInternalRow].getName +val arrVals = ctx.freshName("arrVals") +val arrCardinality = ctx.freshName("arrCardinality") +val biggestCardinality = ctx.freshName("biggestCardinality") +val storedArrTypes = ctx.freshName("storedArrTypes") +val returnNull = ctx.freshName("returnNull") +val evals = children.map(_.genCode(ctx)) + +val inputs = evals.zipWithIndex.map { case (eval, index) => + s""" +|${eval.code} +|if (!${eval.isNull}) { +| $arrVals[$index] = ${eval.value}; +| $arrCardinality[$index] = ${eval.value}.numElements(); +|} else { +| $arrVals[$index] = null; +| $arrCardinality[$index] = 0; +| $returnNull[0] = true; +|} +|$storedArrTypes[$index] = "${arrayElementTypes(index)}"; --- End diff -- Do we need `storedArrType`? Since we can know data type of all children before execution, would it be possible to check the correctness in own `checkInputDataTypes()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21061 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21433: [SPARK-23820][CORE] Enable use of long form of callsite ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21433 Could you please add the description for this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21069: [SPARK-23920][SQL]add array_remove to remove all ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21069#discussion_r190485891 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -1882,3 +1882,123 @@ case class ArrayRepeat(left: Expression, right: Expression) } } + +/** + * Remove all elements that equal to element from the given array + */ +@ExpressionDescription( + usage = "_FUNC_(array, element) - Remove all elements that equal to element from array.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3, null, 3), 3); + [1,2,null] + """, since = "2.4.0") +case class ArrayRemove(left: Expression, right: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + override def dataType: DataType = left.dataType + + override def inputTypes: Seq[AbstractDataType] = +Seq(ArrayType, left.dataType.asInstanceOf[ArrayType].elementType) + + lazy val elementType: DataType = left.dataType.asInstanceOf[ArrayType].elementType + + @transient private lazy val ordering: Ordering[Any] = +TypeUtils.getInterpretedOrdering(right.dataType) + + override def checkInputDataTypes(): TypeCheckResult = { +if (!left.dataType.isInstanceOf[ArrayType] + || left.dataType.asInstanceOf[ArrayType].elementType != right.dataType) { + TypeCheckResult.TypeCheckFailure( +"Arguments must be an array followed by a value of same type as the array members") +} else { + TypeUtils.checkForOrderingExpr(right.dataType, s"function $prettyName") +} + } + + override def nullSafeEval(arr: Any, value: Any): Any = { +val newArray = new Array[Any](arr.asInstanceOf[ArrayData].numElements()) +var pos = 0 +arr.asInstanceOf[ArrayData].foreach(right.dataType, (i, v) => + if (v == null) { +if (value != null) { --- End diff -- nit: Do we need this check since we are in `nullSafeEval`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21419: Branch 2.2
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21419 could you please close this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21069: [SPARK-23920][SQL]add array_remove to remove all element...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21069 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21406: [Minor][Core] Cleanup unused vals in `DAGScheduler.handl...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21406 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21389 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21401: [SPARK-24350][SQL] Fixes ClassCastException in the "arra...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21401 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21401: [SPARK-24350][SQL] Fixes ClassCastException in the "arra...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21401 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21391: [SPARK-24343][SQL] Avoid shuffle for the bucketed table ...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21391 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21394: [SPARK-24329][SQL] Test for skipping multi-space lines
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21394 Retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21266 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21389 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21311 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21389 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21266 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21409: [SPARK-24365][SQL] Add Parquet write benchmark
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21409#discussion_r190231900 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import scala.util.Try + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.{Benchmark, Utils} + +/** + * Benchmark to measure parquet write performance. + * To run this: + * spark-submit --class --jars + */ +object ParquetWriteBenchmark { + val conf = new SparkConf() + conf.set("spark.sql.parquet.compression.codec", "snappy") + + val spark = SparkSession.builder +.master("local[1]") +.appName("parquet-write-benchmark") +.config(conf) +.getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { +val (keys, values) = pairs.unzip +val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption) +(keys, values).zipped.foreach(spark.conf.set) +try f finally { + keys.zip(currentValues).foreach { +case (key, Some(value)) => spark.conf.set(key, value) +case (key, None) => spark.conf.unset(key) + } +} + } + + def runSQL(name: String, sql: String, values: Int): Unit = { +withTempTable("t1") { + spark.range(values).createOrReplaceTempView("t1") + val benchmark = new Benchmark(name, values) + benchmark.addCase("Parquet Writer") { _ => +withTempPath { dir => + spark.sql(sql).write.parquet(dir.getCanonicalPath) +} + } + benchmark.run() +} + } + + def intWriteBenchmark(values: Int): Unit = { +/* +Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + +Output Single Int Column:Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +Parquet Writer2536 / 2610 6.2 161.3 1.0X +*/ +runSQL("Output Single Int Column", "select cast(id as INT) as id from t1", values) + } + + def intStringWriteBenchmark(values: Int): Unit = { +/* +Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz + +Output Int and String Column:Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +Parquet Writer4644 / 4673 2.3 442.9 1.0X +*/ +runSQL(name = "Output Int and String Column", --- End diff -- nit: Is there any reason to use named argument for this `runSQL` and not to use name argument in another `runSQL`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21405: [SPARK-24361][SQL] Polish code block manipulation...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21405#discussion_r190228283 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeBlockSuite.scala --- @@ -120,11 +120,11 @@ class CodeBlockSuite extends SparkFunSuite { |}""".stripMargin val aliasedParam = JavaCode.variable("aliased", expr.javaType) -val aliasedInputs = code.asInstanceOf[CodeBlock].blockInputs.map { - case _: SimpleExprValue => aliasedParam - case other => other + +// We want to replace all occurrences of `expr` with the variable `aliasedParam`. +val aliasedCode = code.transformExprValues { + case SimpleExprValue("1 + 1", _) => aliasedParam --- End diff -- nit: I know the current code works correctly. How about replacing `_` with `CodeGenerator.javaClass(IntegerType)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21311 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21401: [SPARK-24350][SQL] "array_position" error fix
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21401 good catch, thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21401: [SPARK-24350][SQL] "array_position" error fix
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21401 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21395: SPARK-24348 "element_at" error fix
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21395 cc @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21106: [SPARK-23711][SQL] Add fallback generator for Uns...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21106#discussion_r189818763 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala --- @@ -24,25 +24,30 @@ import org.scalatest.Matchers import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.catalyst.util._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, LongType, _} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.UTF8String -class UnsafeRowConverterSuite extends SparkFunSuite with Matchers { +class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with PlanTestBase { private def roundedSize(size: Int) = ByteArrayMethods.roundNumberOfBytesToNearestWord(size) - private def testWithFactory( -name: String)( -f: UnsafeProjectionCreator => Unit): Unit = { -test(name) { - f(UnsafeProjection) - f(InterpretedUnsafeProjection) + private def testBothCodegenAndInterpreted(name: String)(f: => Unit): Unit = { +val modes = Seq(CodegenObjectFactoryMode.CODEGEN_ONLY, CodegenObjectFactoryMode.NO_CODEGEN) +for (fallbackMode <- modes) { + test(name + " with " + fallbackMode) { --- End diff -- nit: `s"$name with $fallbackMode"` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21106: [SPARK-23711][SQL] Add fallback generator for UnsafeProj...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21106 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/20636 ping @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21342#discussion_r189634704 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala --- @@ -111,12 +112,18 @@ case class BroadcastExchangeExec( SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq) broadcasted } catch { + // SPARK-24294: To bypass scala bug: https://github.com/scala/bug/issues/9554, we throw + // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult + // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => -throw new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + +throw new SparkFatalException( + new OutOfMemoryError(s"Not enough memory to build and broadcast the table to " + --- End diff -- Just curious: Can we perform object operations (allocate `OutOfMemoryError`, allocate and concatenate `String`s) when we caught ` OutOfMemoryError`? I think that we have space since we failed to allocate a large object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21374 Until now, @gatorsmile and @ueshin fixed these when we found. I am neutral on the policy. I would like to hear their opinion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21376: [SPARK-24250][SQL] support accessing SQLConf insi...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21376#discussion_r189465557 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -107,7 +107,20 @@ object SQLConf { * run tests in parallel. At the time this feature was implemented, this was a no-op since we * run unit tests (that does not involve SparkSession) in serial order. */ - def get: SQLConf = confGetter.get()() + def get: SQLConf = { +if (TaskContext.get != null) { + new ReadOnlySQLConf(TaskContext.get()) +} else { + if (Utils.isTesting && SparkContext.getActive.isDefined) { --- End diff -- good check!! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21361: [SPARK-24313][SQL] Fix collection operations' interprete...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21361 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21376: [SPARK-24250][SQL] support accessing SQLConf insi...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21376#discussion_r189458452 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.internal + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.test.SQLTestUtils + +class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils { + import testImplicits._ + + protected var spark: SparkSession = null + + // Create a new [[SparkSession]] running in local-cluster mode. + override def beforeAll(): Unit = { +super.beforeAll() +spark = SparkSession.builder() + .master("local-cluster[2,1,1024]") + .appName("testing") + .getOrCreate() + } + + override def afterAll(): Unit = { +spark.stop() +spark = null + } + + test("ReadonlySQLConf is correctly created at the executor side") { --- End diff -- nit: `ReadOnlySQLConf ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21374 cc @gatorsmile @ueshin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21342 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21374 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21372 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21371: [SPARK-24250][SQL][FollowUp] Fix compile error and flaky...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21371 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21371: [SPARK-24250][SQL][FollowUp] Fix compile error and flaky...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21371 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21374 @dongjoon-hyun thank you, l will kick this later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org