[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...

2018-06-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21570#discussion_r195639885
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.execution.sort;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.TestMemoryConsumer;
+import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.execution.RecordBinaryComparator;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
+import org.apache.spark.unsafe.array.LongArray;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.util.collection.unsafe.sort.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the RecordBinaryComparator, which compares two UnsafeRows by their 
binary form.
+ */
+public class RecordBinaryComparatorSuite {
+
+  private final TaskMemoryManager memoryManager = new TaskMemoryManager(
+  new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
+  private final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+
+  private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+
+  private MemoryBlock dataPage;
+  private long pageCursor;
+
+  private LongArray array;
+  private int pos;
+
+  @Before
+  public void beforeEach() {
+// Only compare between two input rows.
+array = consumer.allocateArray(2);
+pos = 0;
+
+dataPage = memoryManager.allocatePage(4096, consumer);
+pageCursor = dataPage.getBaseOffset();
+  }
+
+  @After
+  public void afterEach() {
+consumer.freePage(dataPage);
+dataPage = null;
+pageCursor = 0;
+
+consumer.freeArray(array);
+array = null;
+pos = 0;
+  }
+
+  private void insertRow(UnsafeRow row) {
+Object recordBase = row.getBaseObject();
+long recordOffset = row.getBaseOffset();
+int recordLength = row.getSizeInBytes();
+
+Object baseObject = dataPage.getBaseObject();
+assert(pageCursor + recordLength <= dataPage.getBaseOffset() + 
dataPage.size());
+long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, 
pageCursor);
+UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength);
+pageCursor += uaoSize;
+Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, 
recordLength);
+pageCursor += recordLength;
+
+assert(pos < 2);
+array.set(pos, recordAddress);
+pos++;
+  }
+
+  private int compare(int index1, int index2) {
+Object baseObject = dataPage.getBaseObject();
+
+long recordAddress1 = array.get(index1);
+long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + 
uaoSize;
+int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset1 - uaoSize);
+
+long recordAddress2 = array.get(index2);
+long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + 
uaoSize;
+int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset2 - uaoSize);
+
+return binaryComparator.compare(baseObject, baseOffset1, 
recordLength1, baseObject,
+baseOffset2, recordLength2);
+  }
+
+  private final RecordComparator binaryComparator = new 
RecordBinaryComparator();
+
+  // Compute the most compact size for UnsafeRow's backing data.
+  private int compute

[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...

2018-06-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21570#discussion_r195636719
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.execution.sort;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.TestMemoryConsumer;
+import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.execution.RecordBinaryComparator;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
+import org.apache.spark.unsafe.array.LongArray;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.util.collection.unsafe.sort.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the RecordBinaryComparator, which compares two UnsafeRows by their 
binary form.
+ */
+public class RecordBinaryComparatorSuite {
+
+  private final TaskMemoryManager memoryManager = new TaskMemoryManager(
+  new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
+  private final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+
+  private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+
+  private MemoryBlock dataPage;
+  private long pageCursor;
+
+  private LongArray array;
+  private int pos;
+
+  @Before
+  public void beforeEach() {
+// Only compare between two input rows.
+array = consumer.allocateArray(2);
+pos = 0;
+
+dataPage = memoryManager.allocatePage(4096, consumer);
+pageCursor = dataPage.getBaseOffset();
+  }
+
+  @After
+  public void afterEach() {
+consumer.freePage(dataPage);
+dataPage = null;
+pageCursor = 0;
+
+consumer.freeArray(array);
+array = null;
+pos = 0;
+  }
+
+  private void insertRow(UnsafeRow row) {
+Object recordBase = row.getBaseObject();
+long recordOffset = row.getBaseOffset();
+int recordLength = row.getSizeInBytes();
+
+Object baseObject = dataPage.getBaseObject();
+assert(pageCursor + recordLength <= dataPage.getBaseOffset() + 
dataPage.size());
+long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, 
pageCursor);
+UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength);
+pageCursor += uaoSize;
+Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, 
recordLength);
+pageCursor += recordLength;
+
+assert(pos < 2);
+array.set(pos, recordAddress);
+pos++;
+  }
+
+  private int compare(int index1, int index2) {
+Object baseObject = dataPage.getBaseObject();
+
+long recordAddress1 = array.get(index1);
+long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + 
uaoSize;
+int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset1 - uaoSize);
+
+long recordAddress2 = array.get(index2);
+long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + 
uaoSize;
+int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset2 - uaoSize);
+
+return binaryComparator.compare(baseObject, baseOffset1, 
recordLength1, baseObject,
+baseOffset2, recordLength2);
+  }
+
+  private final RecordComparator binaryComparator = new 
RecordBinaryComparator();
+
+  // Compute the most compact size for UnsafeRow's backing data.
+  private int compute

[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...

2018-06-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21570#discussion_r195636548
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.execution.sort;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.TestMemoryConsumer;
+import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.execution.RecordBinaryComparator;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
+import org.apache.spark.unsafe.array.LongArray;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.util.collection.unsafe.sort.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the RecordBinaryComparator, which compares two UnsafeRows by their 
binary form.
+ */
+public class RecordBinaryComparatorSuite {
+
+  private final TaskMemoryManager memoryManager = new TaskMemoryManager(
+  new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
+  private final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+
+  private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+
+  private MemoryBlock dataPage;
+  private long pageCursor;
+
+  private LongArray array;
+  private int pos;
+
+  @Before
+  public void beforeEach() {
+// Only compare between two input rows.
+array = consumer.allocateArray(2);
+pos = 0;
+
+dataPage = memoryManager.allocatePage(4096, consumer);
+pageCursor = dataPage.getBaseOffset();
+  }
+
+  @After
+  public void afterEach() {
+consumer.freePage(dataPage);
+dataPage = null;
+pageCursor = 0;
+
+consumer.freeArray(array);
+array = null;
+pos = 0;
+  }
+
+  private void insertRow(UnsafeRow row) {
+Object recordBase = row.getBaseObject();
+long recordOffset = row.getBaseOffset();
+int recordLength = row.getSizeInBytes();
+
+Object baseObject = dataPage.getBaseObject();
+assert(pageCursor + recordLength <= dataPage.getBaseOffset() + 
dataPage.size());
+long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, 
pageCursor);
+UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength);
+pageCursor += uaoSize;
+Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, 
recordLength);
+pageCursor += recordLength;
+
+assert(pos < 2);
+array.set(pos, recordAddress);
+pos++;
+  }
+
+  private int compare(int index1, int index2) {
+Object baseObject = dataPage.getBaseObject();
+
+long recordAddress1 = array.get(index1);
+long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + 
uaoSize;
+int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset1 - uaoSize);
+
+long recordAddress2 = array.get(index2);
+long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + 
uaoSize;
+int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset2 - uaoSize);
+
+return binaryComparator.compare(baseObject, baseOffset1, 
recordLength1, baseObject,
+baseOffset2, recordLength2);
+  }
+
+  private final RecordComparator binaryComparator = new 
RecordBinaryComparator();
+
+  // Compute the most compact size for UnsafeRow's backing data.
+  private int compute

[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21570: [SPARK-24564][TEST] Add test suite for RecordBina...

2018-06-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21570#discussion_r195634729
  
--- Diff: 
sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test.org.apache.spark.sql.execution.sort;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.TestMemoryConsumer;
+import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.execution.RecordBinaryComparator;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.UnsafeAlignedOffset;
+import org.apache.spark.unsafe.array.LongArray;
+import org.apache.spark.unsafe.memory.MemoryBlock;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.apache.spark.util.collection.unsafe.sort.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test the RecordBinaryComparator, which compares two UnsafeRows by their 
binary form.
+ */
+public class RecordBinaryComparatorSuite {
+
+  private final TaskMemoryManager memoryManager = new TaskMemoryManager(
+  new TestMemoryManager(new 
SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
+  private final TestMemoryConsumer consumer = new 
TestMemoryConsumer(memoryManager);
+
+  private final int uaoSize = UnsafeAlignedOffset.getUaoSize();
+
+  private MemoryBlock dataPage;
+  private long pageCursor;
+
+  private LongArray array;
+  private int pos;
+
+  @Before
+  public void beforeEach() {
+// Only compare between two input rows.
+array = consumer.allocateArray(2);
+pos = 0;
+
+dataPage = memoryManager.allocatePage(4096, consumer);
+pageCursor = dataPage.getBaseOffset();
+  }
+
+  @After
+  public void afterEach() {
+consumer.freePage(dataPage);
+dataPage = null;
+pageCursor = 0;
+
+consumer.freeArray(array);
+array = null;
+pos = 0;
+  }
+
+  private void insertRow(UnsafeRow row) {
+Object recordBase = row.getBaseObject();
+long recordOffset = row.getBaseOffset();
+int recordLength = row.getSizeInBytes();
+
+Object baseObject = dataPage.getBaseObject();
+assert(pageCursor + recordLength <= dataPage.getBaseOffset() + 
dataPage.size());
+long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, 
pageCursor);
+UnsafeAlignedOffset.putSize(baseObject, pageCursor, recordLength);
+pageCursor += uaoSize;
+Platform.copyMemory(recordBase, recordOffset, baseObject, pageCursor, 
recordLength);
+pageCursor += recordLength;
+
+assert(pos < 2);
+array.set(pos, recordAddress);
+pos++;
+  }
+
+  private int compare(int index1, int index2) {
+Object baseObject = dataPage.getBaseObject();
+
+long recordAddress1 = array.get(index1);
+long baseOffset1 = memoryManager.getOffsetInPage(recordAddress1) + 
uaoSize;
+int recordLength1 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset1 - uaoSize);
+
+long recordAddress2 = array.get(index2);
+long baseOffset2 = memoryManager.getOffsetInPage(recordAddress2) + 
uaoSize;
+int recordLength2 = UnsafeAlignedOffset.getSize(baseObject, 
baseOffset2 - uaoSize);
+
+return binaryComparator.compare(baseObject, baseOffset1, 
recordLength1, baseObject,
+baseOffset2, recordLength2);
+  }
+
+  private final RecordComparator binaryComparator = new 
RecordBinaryComparator();
+
+  // Compute the most compact size for UnsafeRow's backing data.
+  private int compute

[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-14 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...

2018-06-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21537#discussion_r195356119
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -805,43 +811,43 @@ case class Cast(child: Expression, dataType: 
DataType, timeZoneId: Option[String
   private[this] def castToStringCode(from: DataType, ctx: CodegenContext): 
CastFunction = {
 from match {
   case BinaryType =>
-(c, evPrim, evNull) => s"$evPrim = UTF8String.fromBytes($c);"
+(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);"
   case DateType =>
-(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString(
+(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
   
org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));"""
   case TimestampType =>
-val tz = ctx.addReferenceObj("timeZone", timeZone)
-(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString(
+val tz = JavaCode.global(ctx.addReferenceObj("timeZone", 
timeZone), timeZone.getClass)
+(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
   
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
   case ArrayType(et, _) =>
 (c, evPrim, evNull) => {
-  val buffer = ctx.freshName("buffer")
-  val bufferClass = classOf[UTF8StringBuilder].getName
+  val buffer = ctx.freshVariable("buffer", 
classOf[UTF8StringBuilder])
+  val bufferClass = JavaCode.className(classOf[UTF8StringBuilder])
--- End diff --

It is fine with me to address this in another PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21537: [SPARK-24505][SQL] Convert strings in codegen to ...

2018-06-14 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21537#discussion_r195331403
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 ---
@@ -805,43 +811,43 @@ case class Cast(child: Expression, dataType: 
DataType, timeZoneId: Option[String
   private[this] def castToStringCode(from: DataType, ctx: CodegenContext): 
CastFunction = {
 from match {
   case BinaryType =>
-(c, evPrim, evNull) => s"$evPrim = UTF8String.fromBytes($c);"
+(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);"
   case DateType =>
-(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString(
+(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
   
org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));"""
   case TimestampType =>
-val tz = ctx.addReferenceObj("timeZone", timeZone)
-(c, evPrim, evNull) => s"""$evPrim = UTF8String.fromString(
+val tz = JavaCode.global(ctx.addReferenceObj("timeZone", 
timeZone), timeZone.getClass)
+(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
   
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
   case ArrayType(et, _) =>
 (c, evPrim, evNull) => {
-  val buffer = ctx.freshName("buffer")
-  val bufferClass = classOf[UTF8StringBuilder].getName
+  val buffer = ctx.freshVariable("buffer", 
classOf[UTF8StringBuilder])
+  val bufferClass = JavaCode.className(classOf[UTF8StringBuilder])
--- End diff --

Now, each variable defined by `freshVariable` has a type. We can get a type 
or its class name from the variable (e.g. `bufffer`). Therefore, it looks 
redundant to declare a name of each variable again (e.g. bufferClass).
Do we have an API to get a type of the variable or define an API to get a 
name of the class? This is because this pattern is very common.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-13 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][SPARK-23879][CORE][SQL] Introduce multiple...

2018-06-13 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19222
  
ping @rednaxelafx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21542: [WIP][SPARK-24529][Build] Add spotbugs into maven...

2018-06-12 Thread kiszk
GitHub user kiszk opened a pull request:

https://github.com/apache/spark/pull/21542

[WIP][SPARK-24529][Build] Add spotbugs into maven build process

## What changes were proposed in this pull request?

This PR enables a Java bytecode check tool 
[spotbugs](https://spotbugs.github.io/) to avoid possible integer overflow at 
multiplication. When a problem is detected, the build process is stopped.
Due to the tool limitation, some other checks will be enabled.

This check is enabled at `compile phase. Thus, `mvn compile` or `mvn 
package` launches this check.

## How was this patch tested?

Existing UTs

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kiszk/spark SPARK-24529

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21542.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21542


commit 3a356ada17a5cad00cb49fea20fb473a9e8392d1
Author: Kazuaki Ishizaki 
Date:   2018-06-12T17:42:48Z

add spotbugs into pom.xml




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...

2018-06-12 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21505
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-12 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
When I check callers of `BufferHolder.grow()`, some of them call 
`ByteArrayMethods.roundNumberOfBytesToNearestWord()` and other do not call it 
(i.e. implicitly ensure word-aligned).

Is it better way to call 
`ByteArrayMethods.roundNumberOfBytesToNearestWord()` at `BufferHolder.grow()` 
instread of a caller to gurantee word-aligned?
Then, we can check whether `UnsafeRow.getSizeInBytes()` is a multiple of 8 
in `BufferHolderSparkSubmitSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21481: [SPARK-24452][SQL][Core] Avoid possible overflow ...

2018-06-11 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21481#discussion_r194561368
  
--- Diff: 
core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java ---
@@ -703,7 +703,7 @@ public boolean append(Object kbase, long koff, int 
klen, Object vbase, long voff
   // must be stored in the same memory page.
   // (8 byte key length) (key) (value) (8 byte pointer to next value)
   int uaoSize = UnsafeAlignedOffset.getUaoSize();
-  final long recordLength = (2 * uaoSize) + klen + vlen + 8;
+  final long recordLength = (2L * uaoSize) + (long)klen + (long)vlen + 
8L;
--- End diff --

You are right. It was too conservative. ` (2L * uaoSize) + klen + vlen + 8` 
can generate `LMUL` or `LADD` as follows:
```
LDC 2
ILOAD 9
I2L
LMUL
ILOAD 4
I2L
LADD
ILOAD 8
I2L
LADD
LDC 8
LADD
LSTORE 10
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-11 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
Sure, I will update this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-11 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r194365155
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,23 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
--- End diff --

+1 for map approach


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r194299485
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,23 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
--- End diff --

Yes, it should work functionally if we check a given time zone every time. 
Do you know the typical access pattern of time zone? If there is temporal 
locality regarding time zone, we do not have to use `mutale.Map`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21520: [SPARK-24505][SQL] Forbidding string interpolation in Co...

2018-06-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21520
  
For 1, I agree with you that it is not good to introduce many APIs at 
first. On the other hand, it would be good to prepare only a few APIs that are 
frequently used, not to prepare many APIs. It make code simpler and cleaner.
I think that `inline"${classOf[...].getName}"` and 
`inline"${CodeGenerator.javaType(...)}"` are frequently used. 
`inline"${CodeGenerator.boxedType(...)}"` may be prepared for consistency with 
`inline"${CodeGenerator.javaType(...)}"`.

For 2, new APIs in `ctx` can co-exist with old APIs. Thus, to introduce new 
APIs can co-exist with your new proposal.

WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21481
  
Thank you for your comment. I will create another PR for integrating 
findBugs/SpotBugs into maven.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r194268734
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,23 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, Calendar]] {
--- End diff --

Usually, only the default time zone is used. To execute `Cast` regarding 
date is called with a timezone may use another timezone. For the correctness, I 
think that it is necessary to support multiple timezones.

To enable caching for default time zone and to create an instance for other 
time zones would also work correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21481
  
Since I found an plug-in for maven, I will also include a patch to add 
findBugs/SpotBugs into maven in this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler in web U...

2018-06-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21510
  
LGTM with one minor comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21510: [SPARK-24490][WebUI] Use WebUI.addStaticHandler i...

2018-06-10 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21510#discussion_r194266907
  
--- Diff: core/src/main/scala/org/apache/spark/ui/WebUI.scala ---
@@ -101,12 +101,12 @@ private[spark] abstract class WebUI(
   }
 
   /**
-   * Add a handler for static content.
+   * Adds a handler for static content.
--- End diff --

In this file, `s` is not added.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21520: [SPARK-24505][SQL] Forbidding string interpolation in Co...

2018-06-10 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21520
  
Thank you for a lot of works to update many places. It is very hard to 
split it into several pieces.

Now, we are seeing several typical patterns in the all of changes, in 
paticular by wrapping the original code. Would it be possible to make changes 
simpler by introducing several APIs?

1) We are seeing many `inline` prefix with a few typical patterns.
```
inline"${classOf[...].getName}"
inline"${CodeGenerator.javaType(...)}"
inline"${CodeGenerator.boxedType(...)}"
```
Can we introduce new APIs to avoid repetations of adding `inline`, for 
example `JavaCode.className(Class[_]): JavaCode` for the first call.


2) We are seeing many `JavaCode.global()` or `JavaCode.variable()` when we 
create a new variable.
Would it be possible to make them simpler?

For example, we may introduce these APIs.

```
ctx.addMutableState(..., Class[_])
ctx.freshName(..., DataType)
ctx.freshNameIsNull(...)
```
The first one calls `JavaCode.global()` in the method. The second one calls 
`JavaCode.variable()`.

WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-08 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21481
  
Since it is Java bytecode analysis, it is available for Scala code, too.
In my quick test, findBugs overlooked a possible overflow. On the other 
hand, findBugs found another redundant null check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-08 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21258
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-08 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21481
  
@JoshRosen @cloud-fan 
Here is an update.

I have just apply `findBugs` to `OffHeapColumnVector.java` and 
`UnsafeArrayData.java`.
In `OffHeapColumnVector.java`, most of possible overflows are detected. 
But, not all.
In `UnsafeArrayData.java`, two possible overflows are detected. Line 86 and 
456. I overlooked Line 86.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21258
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193942183
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -236,6 +236,76 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class MapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  var i = 0
+  while (i < keyArrayData.numElements) {
+if (keyArrayData.isNullAt(i)) {
+  throw new RuntimeException("Cannot use null as map key!")
+}
+i += 1
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val i = ctx.freshName("i")
+s"""
+   |for (int $i = 0; $i < $keyArrayData.numElements(); $i++) {
+   |  if ($keyArrayData.isNullAt($i)) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
+ """.stripMargin
+  }
+  s"""
+ |if ($keyArrayData.numElements() != 
$valueArrayData.numElements()) {
+ |  throw new RuntimeException("The given two arrays should have 
the same length");
+ |}
+ |$keyArrayElemNullCheck
+ |${ev.value} = new $arrayBasedMapData($keyArrayData.copy(), 
$valueArrayData.copy());
+   """.stripMargin
+})
+  }
+
+  override def prettyName: String = "create_map_from_arrays"
--- End diff --

Oh, good catch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193928013
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
--- End diff --

sure, done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193927995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
--- End diff --

good catch, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r193927508
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) {
+throw new RuntimeException("Cannot use null as map key!")
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", 
leftArrayType.elementType)
+val array = ctx.freshName("array")
+val i = ctx.freshName("i")
+s"""
+   |Object[] $array = 
$keyArrayData.toObjectArray($leftArrayTypeTerm);
+   |for (int $i = 0; $i < $array.length; $i++) {
+   |  if ($array[$i] == null) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
--- End diff --

Got it. An array has been evaluated.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21507: Branch 1.6

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21507
  
@deepaksonu  Would it be possible to close this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21109: [SPARK-24020][SQL] Sort-merge join inner range op...

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21109#discussion_r193762830
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/InMemoryUnsafeRowQueue.scala
 ---
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import java.util.ConcurrentModificationException
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.DefaultInitialSizeOfInMemoryBuffer
+import org.apache.spark.storage.BlockManager
+
+/**
+ * An append-only array for [[UnsafeRow]]s that strictly keeps content in 
an in-memory array
+ * until [[numRowsInMemoryBufferThreshold]] is reached post which it will 
switch to a mode which
+ * would flush to disk after [[numRowsSpillThreshold]] is met (or before 
if there is
+ * excessive memory consumption). Setting these threshold involves 
following trade-offs:
+ *
+ * - If [[numRowsInMemoryBufferThreshold]] is too high, the in-memory 
array may occupy more memory
+ *   than is available, resulting in OOM.
+ * - If [[numRowsSpillThreshold]] is too low, data will be spilled 
frequently and lead to
+ *   excessive disk writes. This may lead to a performance regression 
compared to the normal case
+ *   of using an [[ArrayBuffer]] or [[Array]].
+ */
+private[sql] class InMemoryUnsafeRowQueue(
+taskMemoryManager: TaskMemoryManager,
+blockManager: BlockManager,
+serializerManager: SerializerManager,
+taskContext: TaskContext,
+initialSize: Int,
+pageSizeBytes: Long,
+numRowsInMemoryBufferThreshold: Int,
+numRowsSpillThreshold: Int)
+  extends ExternalAppendOnlyUnsafeRowArray(taskMemoryManager,
+  blockManager,
+  serializerManager,
+  taskContext,
+  initialSize,
+  pageSizeBytes,
+  numRowsInMemoryBufferThreshold,
+  numRowsSpillThreshold) {
+
+  def this(numRowsInMemoryBufferThreshold: Int, numRowsSpillThreshold: 
Int) {
+this(
+  TaskContext.get().taskMemoryManager(),
+  SparkEnv.get.blockManager,
+  SparkEnv.get.serializerManager,
+  TaskContext.get(),
+  1024,
+  SparkEnv.get.memoryManager.pageSizeBytes,
+  numRowsInMemoryBufferThreshold,
+  numRowsSpillThreshold)
+  }
+
+  private val initialSizeOfInMemoryBuffer =
+Math.min(DefaultInitialSizeOfInMemoryBuffer, 
numRowsInMemoryBufferThreshold)
+
+  private val inMemoryQueue = if (initialSizeOfInMemoryBuffer > 0) {
+new mutable.Queue[UnsafeRow]()
+  } else {
+null
+  }
+
+//  private var spillableArray: UnsafeExternalSorter = _
--- End diff --

nit: Is this comment necessary?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21505: [SPARK-24457][SQL] Improving performance of stringToTime...

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21505
  
We would appreciate it if you put the performance before and after this PR?
It would be good to use `Benchmark` class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function

2018-06-07 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21061
  
Let me think about the implementation to keep the order.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21505: [SPARK-24457][SQL] Improving performance of strin...

2018-06-07 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21505#discussion_r193678440
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 ---
@@ -111,6 +113,24 @@ object DateTimeUtils {
 computedTimeZones.computeIfAbsent(timeZoneId, computeTimeZone)
   }
 
+  private val threadLocalComputedCalendarsMap =
+new ThreadLocal[mutable.Map[TimeZone, (Calendar, Long)]] {
+  override def initialValue(): mutable.Map[TimeZone, (Calendar, Long)] 
= {
+mutable.Map[TimeZone, (Calendar, Long)]()
+  }
+}
+
+  def getCalendar(timeZone: TimeZone): Calendar = {
+val (c, timeInMillis) = threadLocalComputedCalendarsMap.get()
+  .getOrElseUpdate(timeZone, {
+val c = Calendar.getInstance(timeZone)
+(c, c.getTimeInMillis)
+  })
+c.clear()
+c.setTimeInMillis(timeInMillis)
--- End diff --

I agree with @viirya 's comment. Do we need to set the value of 
`System.currentTimeMillis()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-04 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192949349
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) {
+throw new RuntimeException("Cannot use null as map key!")
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", 
leftArrayType.elementType)
+val array = ctx.freshName("array")
+val i = ctx.freshName("i")
+s"""
+   |Object[] $array = 
$keyArrayData.toObjectArray($leftArrayTypeTerm);
+   |for (int $i = 0; $i < $array.length; $i++) {
+   |  if ($array[$i] == null) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
--- End diff --

This code should work if we evaluate each element to make `isNullAt()` 
valid.

I think that my mistake is not to currently evaluate each element in 
`keyArrayData` and `valueArrayData.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-03 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21481
  
Good questions.

For 2, at first I found one of these issues when I looked at a file. Then, 
I ran `grep` command with `long .*=.*\*` and `long .*=.*\+` in `.java` file. 
Then, I picked them up manually. It looks labor-intensive.

For 3, here is my thought.
[`SpotBugs`](https://spotbugs.github.io/) may be a good candidate to check 
it.
SpotBug is a successor of  [`findBugs`](https://findbugs.sourceforge.net/). 
When I ran `FindBugs` before, I found some problems regarding possible overflow 
and then made a PR that was integrated. On the other hand, these issues may not 
be detected at that time.

I will look at SpotBugs after my presentation at SAIS will be finished :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-02 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21481
  
cc @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192551411
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) {
+throw new RuntimeException("Cannot use null as map key!")
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", 
leftArrayType.elementType)
+val array = ctx.freshName("array")
+val i = ctx.freshName("i")
+s"""
+   |Object[] $array = 
$keyArrayData.toObjectArray($leftArrayTypeTerm);
+   |for (int $i = 0; $i < $array.length; $i++) {
+   |  if ($array[$i] == null) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
--- End diff --

However, I realized we have to evaluate each element as `CreateMap` does. I 
think that we have to update eval and codegen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192548230
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
+extends BinaryExpression with ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, 
ArrayType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+(left.dataType, right.dataType) match {
+  case (ArrayType(_, _), ArrayType(_, _)) =>
+TypeCheckResult.TypeCheckSuccess
+  case _ =>
+TypeCheckResult.TypeCheckFailure("The given two arguments should 
be an array")
+}
+  }
+
+  override def dataType: DataType = {
+MapType(
+  keyType = left.dataType.asInstanceOf[ArrayType].elementType,
+  valueType = right.dataType.asInstanceOf[ArrayType].elementType,
+  valueContainsNull = 
right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  override def nullable: Boolean = left.nullable || right.nullable
+
+  override def nullSafeEval(keyArray: Any, valueArray: Any): Any = {
+val keyArrayData = keyArray.asInstanceOf[ArrayData]
+val valueArrayData = valueArray.asInstanceOf[ArrayData]
+if (keyArrayData.numElements != valueArrayData.numElements) {
+  throw new RuntimeException("The given two arrays should have the 
same length")
+}
+val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+if (leftArrayType.containsNull) {
+  if (keyArrayData.toArray(leftArrayType.elementType).contains(null)) {
+throw new RuntimeException("Cannot use null as map key!")
+  }
+}
+new ArrayBasedMapData(keyArrayData.copy(), valueArrayData.copy())
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+nullSafeCodeGen(ctx, ev, (keyArrayData, valueArrayData) => {
+  val arrayBasedMapData = classOf[ArrayBasedMapData].getName
+  val leftArrayType = left.dataType.asInstanceOf[ArrayType]
+  val keyArrayElemNullCheck = if (!leftArrayType.containsNull) "" else 
{
+val leftArrayTypeTerm = ctx.addReferenceObj("leftArrayType", 
leftArrayType.elementType)
+val array = ctx.freshName("array")
+val i = ctx.freshName("i")
+s"""
+   |Object[] $array = 
$keyArrayData.toObjectArray($leftArrayTypeTerm);
+   |for (int $i = 0; $i < $array.length; $i++) {
+   |  if ($array[$i] == null) {
+   |throw new RuntimeException("Cannot use null as map key!");
+   |  }
+   |}
--- End diff --

good catch, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192548103
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala
 ---
@@ -235,6 +235,86 @@ case class CreateMap(children: Seq[Expression]) 
extends Expression {
   override def prettyName: String = "map"
 }
 
+/**
+ * Returns a catalyst Map containing the two arrays in children 
expressions as keys and values.
+ */
+@ExpressionDescription(
+  usage = """
+_FUNC_(keys, values) - Creates a map with a pair of the given 
key/value arrays. All elements
+  in keys should not be null""",
+  examples = """
+Examples:
+  > SELECT _FUNC_([1.0, 3.0], ['2', '4']);
+   {1.0:"2",3.0:"4"}
+  """, since = "2.4.0")
+case class CreateMapFromArrays(left: Expression, right: Expression)
--- End diff --

In existing convention, `"map" -> "CreateMap"`. How about 
`"map_from_arrays" -> ???`?
I am neutral on `MapFromArrays` or `CreateMapFromArrays`. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21258: [SPARK-23933][SQL] Add map_from_arrays function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21258#discussion_r192547906
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -1798,6 +1798,22 @@ def create_map(*cols):
 return Column(jc)
 
 
+@ignore_unicode_prefix
+@since(2.4)
+def create_map_from_arrays(col1, col2):
--- End diff --

Sure


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192546226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -2189,3 +2189,302 @@ case class ArrayRemove(left: Expression, right: 
Expression)
 
   override def prettyName: String = "array_remove"
 }
+
+object ArraySetLike {
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+if (useGenericArrayData(LongType.defaultSize, array.length)) {
+  new GenericArrayData(array)
+} else {
+  UnsafeArrayData.fromPrimitiveArray(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+if (useGenericArrayData(LongType.defaultSize, array.length)) {
+  new GenericArrayData(array)
+} else {
+  UnsafeArrayData.fromPrimitiveArray(array)
+}
+  }
+
+  def useGenericArrayData(elementSize: Int, length: Int): Boolean = {
--- End diff --

Although I tried it, I stopped reusing. This is because 
`UnsafeArrayData.fromPrimitiveArray()` also uses variables (e.g. 
`headerInBytes` and `valueRegionInBytes`) calculated in this method.
I think that there is no typical way to return multiple values from a 
function.

Thus, we can move this to `UnsafeArrayData`. But, it is not easy to reuse 
it. WDYT?

```
  private static UnsafeArrayData fromPrimitiveArray(
   Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderPortionInBytes(length);
final long valueRegionInBytes = elementSize * length;
final long totalSizeInLongs = (headerInBytes + valueRegionInBytes + 7) 
/ 8;
if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
  throw new UnsupportedOperationException("Cannot convert this array to 
unsafe format as " +
"it's too big.");
}

final long[] data = new long[(int)totalSizeInLongs];

Platform.putLong(data, Platform.LONG_ARRAY_OFFSET, length);
Platform.copyMemory(arr, offset, data,
  Platform.LONG_ARRAY_OFFSET + headerInBytes, valueRegionInBytes);

UnsafeArrayData result = new UnsafeArrayData();
result.pointTo(data, Platform.LONG_ARRAY_OFFSET, (int)totalSizeInLongs 
* 8);
return result;
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21481: [SPARK-24452][SQL][Core] Avoid possible overflow in int ...

2018-06-01 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21481
  
cc @ueshin @hvanhovell 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21481: [SPARK-24452][SQL][Core] Avoid possible overflow ...

2018-06-01 Thread kiszk
GitHub user kiszk opened a pull request:

https://github.com/apache/spark/pull/21481

[SPARK-24452][SQL][Core] Avoid possible overflow in int add or multiple

## What changes were proposed in this pull request?

This PR fixes possible overflow in int add or multiply.

The following assignments may cause overflow in right hand side. As a 
result, the result may be negative.
```
long = int * int
long = int + int
```

To avoid this problem, this PR performs cast from int to long in right hand 
side.

## How was this patch tested?

Existing UTs.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kiszk/spark SPARK-24452

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21481.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21481


commit 324fd5ccb73c8017f5537031db21b687ac1ca27a
Author: Kazuaki Ishizaki 
Date:   2018-06-01T20:22:34Z

initial commit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192490355
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
--- End diff --

Ah, I misunderstood. To accept `Integer.MAX_VALUE * 8` looks a future plan.
Anyway, I will use the same calculation in 
`UnsafeArrayData.fromPrimitiveArray()`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192340073
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def arrayUnion(
+  array1: ArrayData,
+  array2: ArrayData,
+  et: DataType,
+  ordering: Ordering[Any]): ArrayData = {
+if (ordering == null) {
+  new 
GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et))
+.distinct.asInstanceOf[Array[Any]])
+} else {
+  val length = math.min(array1.numElements().toLong + 
array2.numElements().toLong,
+ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
+  val array = new Array[Any](length.toInt)
+  var pos = 0
+  var hasNull = false
+  Seq(array1, array2).foreach(_.foreach(et, (_, v) => {
+var found = false
+if (v == null) {
+  if (hasNull) {
+found = true
+  } else {
+hasNull = true
+  }
+} else {
+  var j = 0
+  while (!found && j < pos) {
+val va = array(j)
+if (va != null && ordering.equiv(va, v)) {
+  found = true
+}
+j = j + 1
+  }
+}
+if (!found) {
+  if (pos > MAX_ARRAY_LENGTH) {
+throw new RuntimeException(s"Unsuccessful try to union arrays 
with $pos" +
+  s" elements due to exceeding the array size limit 
$MAX_ARRAY_LENGTH.")
+  }
+  array(pos) = v
+  pos = pos + 1
+}
+  }))
+  new GenericArrayData(array.slice(0, pos))
+}
+  }
+}
+
+abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
+  def typeId: Int
+
+  override def dataType: DataType = left.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val typeCheckResult = super.checkInputDataTypes()
+if (typeCheckResult.isSuccess) {
+  
TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType,
+s"function $prettyName")
+} else {
+  typeCheckResult
+}
+  }
+
+  private def cn = left.dataType.asInstanceOf[ArrayType].containsNull ||
+right.dataType.asInstanceOf[ArrayType].containsNull
+
+  @transient private lazy val ordering: Ordering[Any] =
+TypeUtils.getInterpretedOrdering(elementType)
+
+  @transient private lazy val elementTypeSupportEquals = elementType match 
{
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): Open

[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192336364
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def arrayUnion(
+  array1: ArrayData,
+  array2: ArrayData,
+  et: DataType,
+  ordering: Ordering[Any]): ArrayData = {
+if (ordering == null) {
+  new 
GenericArrayData(array1.toObjectArray(et).union(array2.toObjectArray(et))
+.distinct.asInstanceOf[Array[Any]])
+} else {
+  val length = math.min(array1.numElements().toLong + 
array2.numElements().toLong,
+ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
+  val array = new Array[Any](length.toInt)
+  var pos = 0
+  var hasNull = false
+  Seq(array1, array2).foreach(_.foreach(et, (_, v) => {
+var found = false
+if (v == null) {
+  if (hasNull) {
+found = true
+  } else {
+hasNull = true
+  }
+} else {
+  var j = 0
+  while (!found && j < pos) {
+val va = array(j)
+if (va != null && ordering.equiv(va, v)) {
+  found = true
+}
+j = j + 1
+  }
+}
+if (!found) {
+  if (pos > MAX_ARRAY_LENGTH) {
+throw new RuntimeException(s"Unsuccessful try to union arrays 
with $pos" +
+  s" elements due to exceeding the array size limit 
$MAX_ARRAY_LENGTH.")
+  }
+  array(pos) = v
+  pos = pos + 1
+}
+  }))
+  new GenericArrayData(array.slice(0, pos))
+}
+  }
+}
+
+abstract class ArraySetLike extends BinaryArrayExpressionWithImplicitCast {
+  def typeId: Int
+
+  override def dataType: DataType = left.dataType
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val typeCheckResult = super.checkInputDataTypes()
+if (typeCheckResult.isSuccess) {
+  
TypeUtils.checkForOrderingExpr(dataType.asInstanceOf[ArrayType].elementType,
+s"function $prettyName")
+} else {
+  typeCheckResult
+}
+  }
+
+  private def cn = left.dataType.asInstanceOf[ArrayType].containsNull ||
+right.dataType.asInstanceOf[ArrayType].containsNull
+
+  @transient private lazy val ordering: Ordering[Any] =
+TypeUtils.getInterpretedOrdering(elementType)
+
+  @transient private lazy val elementTypeSupportEquals = elementType match 
{
+case BinaryType => false
+case _: AtomicType => true
+case _ => false
+  }
+
+  def intEval(ary: ArrayData, hs2: OpenHashSet[Int]): Open

[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192331296
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
--- End diff --

As I wrote a comment, since `UnsafeArrayData.fromPrimitiveArray()` uses 
`long[]`, this method can accept up to `Integer.MAX_VALUE * 8` (8 means 
`sizeof(long)`) as total byte size.
Of course, conservatively, we limit the length by up to `Integer.MAX_VALUE`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21061: [SPARK-23914][SQL] Add array_union function

2018-06-01 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21061#discussion_r192330635
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,311 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+object ArraySetLike {
+  val kindUnion = 1
+
+  private val MAX_ARRAY_LENGTH: Int = 
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
+
+  def toArrayDataInt(hs: OpenHashSet[Int]): ArrayData = {
+val array = new Array[Int](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 4L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
+  UnsafeArrayData.fromPrimitiveArray(array)
+} else {
+  new GenericArrayData(array)
+}
+  }
+
+  def toArrayDataLong(hs: OpenHashSet[Long]): ArrayData = {
+val array = new Array[Long](hs.size)
+var pos = hs.nextPos(0)
+var i = 0
+while (pos != OpenHashSet.INVALID_POS) {
+  array(i) = hs.getValue(pos)
+  pos = hs.nextPos(pos + 1)
+  i += 1
+}
+
+val numBytes = 8L * array.length
+val unsafeArraySizeInBytes = 
UnsafeArrayData.calculateHeaderPortionInBytes(array.length) +
+  
org.apache.spark.unsafe.array.ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes)
+// Since UnsafeArrayData.fromPrimitiveArray() uses long[], max 
elements * 8 bytes can be used
+if (unsafeArraySizeInBytes <= Integer.MAX_VALUE * 8) {
--- End diff --

`8` means of `sizeof(long)` in Java primitive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function

2018-05-29 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21061
  
ping @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21443: [SPARK-24369][SQL] Correct handling for multiple distinc...

2018-05-29 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21443
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-05-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r191260626
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -127,6 +127,165 @@ case class MapKeys(child: Expression)
   override def prettyName: String = "map_keys"
 }
 
+@ExpressionDescription(
+  usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in 
the N-th position the
+  N-th value of each array given.""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+[[1, 2], [2, 3], [3, 4]]
+  > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+[[1, 2, 3], [2, 3, 4]]
+  """,
+  since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with 
ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.length)(ArrayType)
+
+  override def dataType: DataType = ArrayType(mountSchema)
+
+  override def nullable: Boolean = children.forall(_.nullable)
+
+  private lazy val arrayTypes = 
children.map(_.dataType.asInstanceOf[ArrayType])
+
+  private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
+
+  def mountSchema: StructType = {
+val fields = children.zip(arrayElementTypes).zipWithIndex.map {
+  case ((expr: NamedExpression, elementType), _) =>
+StructField(expr.name, elementType, nullable = true)
+  case ((_, elementType), idx) =>
+StructField(s"$idx", elementType, nullable = true)
+}
+StructType(fields)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val numberOfArrays: Int = children.length
+val genericArrayData = classOf[GenericArrayData].getName
+val genericInternalRow = classOf[GenericInternalRow].getName
+val arrVals = ctx.freshName("arrVals")
+val arrCardinality = ctx.freshName("arrCardinality")
+val biggestCardinality = ctx.freshName("biggestCardinality")
+val storedArrTypes = ctx.freshName("storedArrTypes")
+val returnNull = ctx.freshName("returnNull")
+val evals = children.map(_.genCode(ctx))
+
+val inputs = evals.zipWithIndex.map { case (eval, index) =>
+  s"""
+|${eval.code}
+|if (!${eval.isNull}) {
+|  $arrVals[$index] = ${eval.value};
+|  $arrCardinality[$index] = ${eval.value}.numElements();
+|} else {
+|  $arrVals[$index] = null;
+|  $arrCardinality[$index] = 0;
+|  $returnNull[0] = true;
+|}
+|$storedArrTypes[$index] = "${arrayElementTypes(index)}";
--- End diff --

In simple cases, since we know only a data type of all children before 
execution, we may not need to use `$storedArrTypes`. However, I may miss 
something. 

Would it be possible to show an example test case that requires to pick the 
correct `getValue` by using `$storedArrTypes`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-05-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r191132972
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -127,6 +127,176 @@ case class MapKeys(child: Expression)
   override def prettyName: String = "map_keys"
 }
 
+@ExpressionDescription(
+  usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in 
the N-th position the
+  N-th value of each array given.""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+[[1, 2], [2, 3], [3, 4]]
+  > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+[[1, 2, 3], [2, 3, 4]]
+  """,
+  since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with 
ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.length)(ArrayType)
+
+  override def dataType: DataType = ArrayType(mountSchema)
+
+  override def nullable: Boolean = children.forall(_.nullable)
+
+  private lazy val arrayTypes = 
children.map(_.dataType.asInstanceOf[ArrayType])
+
+  private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
+
+
+  def mountSchema: StructType = {
+val fields = arrayTypes.zipWithIndex.map { case (arr, idx) =>
+  val fieldName = if (children(idx).isInstanceOf[NamedExpression]) {
+  children(idx).asInstanceOf[NamedExpression].name
+} else {
+  s"$idx"
+}
+  StructField(fieldName, arr.elementType, children(idx).nullable || 
arr.containsNull)
+}
+StructType(fields)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val numberOfArrays: Int = children.length
+val genericArrayData = classOf[GenericArrayData].getName
+val genericInternalRow = classOf[GenericInternalRow].getName
+val arrVals = ctx.freshName("arrVals")
+val arrCardinality = ctx.freshName("arrCardinality")
+val biggestCardinality = ctx.freshName("biggestCardinality")
+val storedArrTypes = ctx.freshName("storedArrTypes")
+val returnNull = ctx.freshName("returnNull")
+val evals = children.map(_.genCode(ctx))
+
+val inputs = evals.zipWithIndex.map { case (eval, index) =>
+  s"""
+|${eval.code}
+|if (!${eval.isNull}) {
+|  $arrVals[$index] = ${eval.value};
+|  $arrCardinality[$index] = ${eval.value}.numElements();
+|} else {
+|  $arrVals[$index] = null;
+|  $arrCardinality[$index] = 0;
+|  $returnNull[0] = true;
+|}
+|$storedArrTypes[$index] = "${arrayElementTypes(index)}";
+|$biggestCardinality = Math.max($biggestCardinality, 
$arrCardinality[$index]);
+  """.stripMargin
+}
+
+val inputsSplitted = ctx.splitExpressions(
+  expressions = inputs,
+  funcName = "getInputAndCardinality",
+  returnType = "int",
+  makeSplitFunction = body =>
+s"""
+  |$body
+  |return $biggestCardinality;
+""".stripMargin,
+  foldFunctions = _.map(funcCall => s"$biggestCardinality = 
$funcCall;").mkString("\n"),
+  arguments =
+("ArrayData[]", arrVals) ::
+("int[]", arrCardinality) ::
+("String[]", storedArrTypes) ::
+("int", biggestCardinality) ::
+("boolean[]", returnNull) :: Nil)
+
+val myobject = ctx.freshName("myobject")
+val j = ctx.freshName("j")
+val i = ctx.freshName("i")
+val args = ctx.freshName("args")
+
+val cases = arrayElementTypes.distinct.map { elementType =>
+  val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", 
elementType, i)
+  s"""
+|case "${elementType}":
+|  $myobject[$j] = $getArrValsItem;
+|  break;
+  """.stripMargin
+}
+
+ev.copy(s"""
+  |ArrayData[] $arrVals = new ArrayData[$numberOfArrays];
+  |int[] $arrCardinality = new int[$numberOfArrays];
+  |int $biggestCardinality = 0;
+  |String[] $storedArrTypes = new String[$numberOfArrays];
+  |boolean[] $returnNull = new boolean[1];
+  |$return

[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-05-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r191132426
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -127,6 +127,176 @@ case class MapKeys(child: Expression)
   override def prettyName: String = "map_keys"
 }
 
+@ExpressionDescription(
+  usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in 
the N-th position the
+  N-th value of each array given.""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+[[1, 2], [2, 3], [3, 4]]
+  > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+[[1, 2, 3], [2, 3, 4]]
+  """,
+  since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with 
ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.length)(ArrayType)
+
+  override def dataType: DataType = ArrayType(mountSchema)
+
+  override def nullable: Boolean = children.forall(_.nullable)
+
+  private lazy val arrayTypes = 
children.map(_.dataType.asInstanceOf[ArrayType])
+
+  private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
+
+
+  def mountSchema: StructType = {
+val fields = arrayTypes.zipWithIndex.map { case (arr, idx) =>
+  val fieldName = if (children(idx).isInstanceOf[NamedExpression]) {
+  children(idx).asInstanceOf[NamedExpression].name
+} else {
+  s"$idx"
+}
+  StructField(fieldName, arr.elementType, children(idx).nullable || 
arr.containsNull)
+}
+StructType(fields)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val numberOfArrays: Int = children.length
+val genericArrayData = classOf[GenericArrayData].getName
+val genericInternalRow = classOf[GenericInternalRow].getName
+val arrVals = ctx.freshName("arrVals")
+val arrCardinality = ctx.freshName("arrCardinality")
+val biggestCardinality = ctx.freshName("biggestCardinality")
+val storedArrTypes = ctx.freshName("storedArrTypes")
+val returnNull = ctx.freshName("returnNull")
+val evals = children.map(_.genCode(ctx))
+
+val inputs = evals.zipWithIndex.map { case (eval, index) =>
+  s"""
+|${eval.code}
+|if (!${eval.isNull}) {
+|  $arrVals[$index] = ${eval.value};
+|  $arrCardinality[$index] = ${eval.value}.numElements();
+|} else {
+|  $arrVals[$index] = null;
+|  $arrCardinality[$index] = 0;
+|  $returnNull[0] = true;
+|}
+|$storedArrTypes[$index] = "${arrayElementTypes(index)}";
+|$biggestCardinality = Math.max($biggestCardinality, 
$arrCardinality[$index]);
+  """.stripMargin
+}
+
+val inputsSplitted = ctx.splitExpressions(
+  expressions = inputs,
+  funcName = "getInputAndCardinality",
+  returnType = "int",
+  makeSplitFunction = body =>
+s"""
+  |$body
+  |return $biggestCardinality;
+""".stripMargin,
+  foldFunctions = _.map(funcCall => s"$biggestCardinality = 
$funcCall;").mkString("\n"),
+  arguments =
+("ArrayData[]", arrVals) ::
+("int[]", arrCardinality) ::
+("String[]", storedArrTypes) ::
+("int", biggestCardinality) ::
+("boolean[]", returnNull) :: Nil)
+
+val myobject = ctx.freshName("myobject")
+val j = ctx.freshName("j")
+val i = ctx.freshName("i")
+val args = ctx.freshName("args")
+
+val cases = arrayElementTypes.distinct.map { elementType =>
+  val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", 
elementType, i)
+  s"""
+|case "${elementType}":
+|  $myobject[$j] = $getArrValsItem;
+|  break;
+  """.stripMargin
+}
+
+ev.copy(s"""
+  |ArrayData[] $arrVals = new ArrayData[$numberOfArrays];
+  |int[] $arrCardinality = new int[$numberOfArrays];
+  |int $biggestCardinality = 0;
+  |String[] $storedArrTypes = new String[$numberOfArrays];
+  |boolean[] $returnNull = new boolean[1];
+  |$return

[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-05-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r191131809
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -127,6 +127,165 @@ case class MapKeys(child: Expression)
   override def prettyName: String = "map_keys"
 }
 
+@ExpressionDescription(
+  usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in 
the N-th position the
+  N-th value of each array given.""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+[[1, 2], [2, 3], [3, 4]]
+  > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+[[1, 2, 3], [2, 3, 4]]
+  """,
+  since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with 
ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.length)(ArrayType)
+
+  override def dataType: DataType = ArrayType(mountSchema)
+
+  override def nullable: Boolean = children.forall(_.nullable)
+
+  private lazy val arrayTypes = 
children.map(_.dataType.asInstanceOf[ArrayType])
+
+  private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
--- End diff --

Can we have more than one `arrayElementTypes`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql

2018-05-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21045#discussion_r191126615
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -127,6 +127,165 @@ case class MapKeys(child: Expression)
   override def prettyName: String = "map_keys"
 }
 
+@ExpressionDescription(
+  usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in 
the N-th position the
+  N-th value of each array given.""",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4));
+[[1, 2], [2, 3], [3, 4]]
+  > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4));
+[[1, 2, 3], [2, 3, 4]]
+  """,
+  since = "2.4.0")
+case class Zip(children: Seq[Expression]) extends Expression with 
ExpectsInputTypes {
+
+  override def inputTypes: Seq[AbstractDataType] = 
Seq.fill(children.length)(ArrayType)
+
+  override def dataType: DataType = ArrayType(mountSchema)
+
+  override def nullable: Boolean = children.forall(_.nullable)
+
+  private lazy val arrayTypes = 
children.map(_.dataType.asInstanceOf[ArrayType])
+
+  private lazy val arrayElementTypes = arrayTypes.map(_.elementType)
+
+  def mountSchema: StructType = {
+val fields = children.zip(arrayElementTypes).zipWithIndex.map {
+  case ((expr: NamedExpression, elementType), _) =>
+StructField(expr.name, elementType, nullable = true)
+  case ((_, elementType), idx) =>
+StructField(s"$idx", elementType, nullable = true)
+}
+StructType(fields)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val numberOfArrays: Int = children.length
+val genericArrayData = classOf[GenericArrayData].getName
+val genericInternalRow = classOf[GenericInternalRow].getName
+val arrVals = ctx.freshName("arrVals")
+val arrCardinality = ctx.freshName("arrCardinality")
+val biggestCardinality = ctx.freshName("biggestCardinality")
+val storedArrTypes = ctx.freshName("storedArrTypes")
+val returnNull = ctx.freshName("returnNull")
+val evals = children.map(_.genCode(ctx))
+
+val inputs = evals.zipWithIndex.map { case (eval, index) =>
+  s"""
+|${eval.code}
+|if (!${eval.isNull}) {
+|  $arrVals[$index] = ${eval.value};
+|  $arrCardinality[$index] = ${eval.value}.numElements();
+|} else {
+|  $arrVals[$index] = null;
+|  $arrCardinality[$index] = 0;
+|  $returnNull[0] = true;
+|}
+|$storedArrTypes[$index] = "${arrayElementTypes(index)}";
--- End diff --

Do we need `storedArrType`? Since we can know data type of all children 
before execution, would it be possible to check the correctness in own 
`checkInputDataTypes()`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21061: [SPARK-23914][SQL] Add array_union function

2018-05-25 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21061
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21433: [SPARK-23820][CORE] Enable use of long form of callsite ...

2018-05-25 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21433
  
Could you please add the description for this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21069: [SPARK-23920][SQL]add array_remove to remove all ...

2018-05-24 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21069#discussion_r190485891
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -1882,3 +1882,123 @@ case class ArrayRepeat(left: Expression, right: 
Expression)
   }
 
 }
+
+/**
+ * Remove all elements that equal to element from the given array
+ */
+@ExpressionDescription(
+  usage = "_FUNC_(array, element) - Remove all elements that equal to 
element from array.",
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3, null, 3), 3);
+   [1,2,null]
+  """, since = "2.4.0")
+case class ArrayRemove(left: Expression, right: Expression)
+  extends BinaryExpression with ImplicitCastInputTypes {
+
+  override def dataType: DataType = left.dataType
+
+  override def inputTypes: Seq[AbstractDataType] =
+Seq(ArrayType, left.dataType.asInstanceOf[ArrayType].elementType)
+
+  lazy val elementType: DataType = 
left.dataType.asInstanceOf[ArrayType].elementType
+
+  @transient private lazy val ordering: Ordering[Any] =
+TypeUtils.getInterpretedOrdering(right.dataType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (!left.dataType.isInstanceOf[ArrayType]
+  || left.dataType.asInstanceOf[ArrayType].elementType != 
right.dataType) {
+  TypeCheckResult.TypeCheckFailure(
+"Arguments must be an array followed by a value of same type as 
the array members")
+} else {
+  TypeUtils.checkForOrderingExpr(right.dataType, s"function 
$prettyName")
+}
+  }
+
+  override def nullSafeEval(arr: Any, value: Any): Any = {
+val newArray = new 
Array[Any](arr.asInstanceOf[ArrayData].numElements())
+var pos = 0
+arr.asInstanceOf[ArrayData].foreach(right.dataType, (i, v) =>
+  if (v == null) {
+if (value != null) {
--- End diff --

nit: Do we need this check since we are in `nullSafeEval`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21419: Branch 2.2

2018-05-24 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21419
  
could you please close this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21069: [SPARK-23920][SQL]add array_remove to remove all element...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21069
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21406: [Minor][Core] Cleanup unused vals in `DAGScheduler.handl...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21406
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21389
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21401: [SPARK-24350][SQL] Fixes ClassCastException in the "arra...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21401
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21401: [SPARK-24350][SQL] Fixes ClassCastException in the "arra...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21401
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21391: [SPARK-24343][SQL] Avoid shuffle for the bucketed table ...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21391
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21394: [SPARK-24329][SQL] Test for skipping multi-space lines

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21394
  
Retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21266
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21389
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21311
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21389: [SPARK-24204][SQL] Verify a schema in Json/Orc/ParquetFi...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21389
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21266: [SPARK-24206][SQL] Improve DataSource read benchmark cod...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21266
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21409: [SPARK-24365][SQL] Add Parquet write benchmark

2018-05-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21409#discussion_r190231900
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteBenchmark.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import scala.util.Try
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{Benchmark, Utils}
+
+/**
+ * Benchmark to measure parquet write performance.
+ * To run this:
+ *  spark-submit --class  --jars 
+ */
+object ParquetWriteBenchmark {
+  val conf = new SparkConf()
+  conf.set("spark.sql.parquet.compression.codec", "snappy")
+
+  val spark = SparkSession.builder
+.master("local[1]")
+.appName("parquet-write-benchmark")
+.config(conf)
+.getOrCreate()
+
+  // Set default configs. Individual cases will change them if necessary.
+  spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true")
+
+  def withTempPath(f: File => Unit): Unit = {
+val path = Utils.createTempDir()
+path.delete()
+try f(path) finally Utils.deleteRecursively(path)
+  }
+
+  def withTempTable(tableNames: String*)(f: => Unit): Unit = {
+try f finally tableNames.foreach(spark.catalog.dropTempView)
+  }
+
+  def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = {
+val (keys, values) = pairs.unzip
+val currentValues = keys.map(key => Try(spark.conf.get(key)).toOption)
+(keys, values).zipped.foreach(spark.conf.set)
+try f finally {
+  keys.zip(currentValues).foreach {
+case (key, Some(value)) => spark.conf.set(key, value)
+case (key, None) => spark.conf.unset(key)
+  }
+}
+  }
+
+  def runSQL(name: String, sql: String, values: Int): Unit = {
+withTempTable("t1") {
+  spark.range(values).createOrReplaceTempView("t1")
+  val benchmark = new Benchmark(name, values)
+  benchmark.addCase("Parquet Writer") { _ =>
+withTempPath { dir =>
+  spark.sql(sql).write.parquet(dir.getCanonicalPath)
+}
+  }
+  benchmark.run()
+}
+  }
+
+  def intWriteBenchmark(values: Int): Unit = {
+/*
+Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
+
+Output Single Int Column:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Parquet Writer2536 / 2610  6.2 
161.3   1.0X
+*/
+runSQL("Output Single Int Column", "select cast(id as INT) as id from 
t1", values)
+  }
+
+  def intStringWriteBenchmark(values: Int): Unit = {
+/*
+Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
+
+Output Int and String Column:Best/Avg Time(ms)
Rate(M/s)   Per Row(ns)   Relative
+

+Parquet Writer4644 / 4673  2.3 
442.9   1.0X
+*/
+runSQL(name = "Output Int and String Column",
--- End diff --

nit: Is there any reason to use named argument for this `runSQL` and not to 
use name argument in another `runSQL`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21405: [SPARK-24361][SQL] Polish code block manipulation...

2018-05-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21405#discussion_r190228283
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeBlockSuite.scala
 ---
@@ -120,11 +120,11 @@ class CodeBlockSuite extends SparkFunSuite {
|}""".stripMargin
 
 val aliasedParam = JavaCode.variable("aliased", expr.javaType)
-val aliasedInputs = code.asInstanceOf[CodeBlock].blockInputs.map {
-  case _: SimpleExprValue => aliasedParam
-  case other => other
+
+// We want to replace all occurrences of `expr` with the variable 
`aliasedParam`.
+val aliasedCode = code.transformExprValues {
+  case SimpleExprValue("1 + 1", _) => aliasedParam
--- End diff --

nit: I know the current code works correctly. How about replacing `_` with 
`CodeGenerator.javaClass(IntegerType)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21311: [SPARK-24257][SQL]LongToUnsafeRowMap calculate the new s...

2018-05-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21311
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21401: [SPARK-24350][SQL] "array_position" error fix

2018-05-22 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21401
  
good catch, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21401: [SPARK-24350][SQL] "array_position" error fix

2018-05-22 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21401
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21395: SPARK-24348 "element_at" error fix

2018-05-22 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21395
  
cc @ueshin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21106: [SPARK-23711][SQL] Add fallback generator for Uns...

2018-05-22 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21106#discussion_r189818763
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
 ---
@@ -24,25 +24,30 @@ import org.scalatest.Matchers
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.plans.PlanTestBase
 import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegerType, LongType, _}
 import org.apache.spark.unsafe.array.ByteArrayMethods
 import org.apache.spark.unsafe.types.UTF8String
 
-class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
+class UnsafeRowConverterSuite extends SparkFunSuite with Matchers with 
PlanTestBase {
 
   private def roundedSize(size: Int) = 
ByteArrayMethods.roundNumberOfBytesToNearestWord(size)
 
-  private def testWithFactory(
-name: String)(
-f: UnsafeProjectionCreator => Unit): Unit = {
-test(name) {
-  f(UnsafeProjection)
-  f(InterpretedUnsafeProjection)
+  private def testBothCodegenAndInterpreted(name: String)(f: => Unit): 
Unit = {
+val modes = Seq(CodegenObjectFactoryMode.CODEGEN_ONLY, 
CodegenObjectFactoryMode.NO_CODEGEN)
+for (fallbackMode <- modes) {
+  test(name + " with " + fallbackMode) {
--- End diff --

nit: `s"$name with $fallbackMode"`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21106: [SPARK-23711][SQL] Add fallback generator for UnsafeProj...

2018-05-22 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21106
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20636: [SPARK-23415][SQL][TEST] Make behavior of BufferHolderSp...

2018-05-21 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/20636
  
ping @hvanhovell


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21342: [SPARK-24294] Throw SparkException when OOM in Br...

2018-05-21 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21342#discussion_r189634704
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
 ---
@@ -111,12 +112,18 @@ case class BroadcastExchangeExec(
   SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
metrics.values.toSeq)
   broadcasted
 } catch {
+  // SPARK-24294: To bypass scala bug: 
https://github.com/scala/bug/issues/9554, we throw
+  // SparkFatalException, which is a subclass of Exception. 
ThreadUtils.awaitResult
+  // will catch this exception and re-throw the wrapped fatal 
throwable.
   case oe: OutOfMemoryError =>
-throw new OutOfMemoryError(s"Not enough memory to build and 
broadcast the table to " +
+throw new SparkFatalException(
+  new OutOfMemoryError(s"Not enough memory to build and 
broadcast the table to " +
--- End diff --

Just curious: Can we perform object operations (allocate 
`OutOfMemoryError`, allocate and concatenate `String`s) when we caught ` 
OutOfMemoryError`?
I think that we have space since we failed to allocate a large object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors

2018-05-20 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21374
  
Until now, @gatorsmile and @ueshin fixed these when we found. I am neutral 
on the policy.
I would like to hear their opinion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21376: [SPARK-24250][SQL] support accessing SQLConf insi...

2018-05-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21376#discussion_r189465557
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -107,7 +107,20 @@ object SQLConf {
* run tests in parallel. At the time this feature was implemented, this 
was a no-op since we
* run unit tests (that does not involve SparkSession) in serial order.
*/
-  def get: SQLConf = confGetter.get()()
+  def get: SQLConf = {
+if (TaskContext.get != null) {
+  new ReadOnlySQLConf(TaskContext.get())
+} else {
+  if (Utils.isTesting && SparkContext.getActive.isDefined) {
--- End diff --

good check!!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21361: [SPARK-24313][SQL] Fix collection operations' interprete...

2018-05-20 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21361
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21376: [SPARK-24250][SQL] support accessing SQLConf insi...

2018-05-20 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21376#discussion_r189458452
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/internal/ExecutorSideSQLConfSuite.scala
 ---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.test.SQLTestUtils
+
+class ExecutorSideSQLConfSuite extends SparkFunSuite with SQLTestUtils {
+  import testImplicits._
+
+  protected var spark: SparkSession = null
+
+  // Create a new [[SparkSession]] running in local-cluster mode.
+  override def beforeAll(): Unit = {
+super.beforeAll()
+spark = SparkSession.builder()
+  .master("local-cluster[2,1,1024]")
+  .appName("testing")
+  .getOrCreate()
+  }
+
+  override def afterAll(): Unit = {
+spark.stop()
+spark = null
+  }
+
+  test("ReadonlySQLConf is correctly created at the executor side") {
--- End diff --

nit: `ReadOnlySQLConf `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors

2018-05-20 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21374
  
cc @gatorsmile @ueshin


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21342: [SPARK-24294] Throw SparkException when OOM in Broadcast...

2018-05-20 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21342
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors

2018-05-20 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21374
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21372: [SPARK-24322][BUILD] Upgrade Apache ORC to 1.4.4

2018-05-20 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21372
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21371: [SPARK-24250][SQL][FollowUp] Fix compile error and flaky...

2018-05-20 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21371
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21371: [SPARK-24250][SQL][FollowUp] Fix compile error and flaky...

2018-05-19 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21371
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #21374: [SPARK-24323][SQL] Fix lint-java errors

2018-05-19 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/21374
  
@dongjoon-hyun thank you, l will kick this later.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    4   5   6   7   8   9   10   11   12   13   >