[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-10-08 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r223460909
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3965,6 +4034,248 @@ object ArrayUnion {
   }
 }
 
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
--- End diff --

It sounds like our null handling is incorrect. NULL does not equal to NULL. 
```
SELECT array_intersect(ARRAY(NULL), ARRAY(NULL));
```

This should return an empty set. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-06 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207967923
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3965,6 +4034,248 @@ object ArrayUnion {
   }
 }
 
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike
+  with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+ArrayType(elementType,
+  left.dataType.asInstanceOf[ArrayType].containsNull &&
+right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = 
{
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+if (array1.numElements() != 0 && array2.numElements() != 0) {
+  val hs = new OpenHashSet[Any]
+  val hsResult = new OpenHashSet[Any]
+  var foundNullElement = false
+  var i = 0
+  while (i < array2.numElements()) {
+if (array2.isNullAt(i)) {
+  foundNullElement = true
+} else {
+  val elem = array2.get(i, elementType)
+  hs.add(elem)
+}
+i += 1
+  }
+  val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+  i = 0
+  while (i < array1.numElements()) {
+if (array1.isNullAt(i)) {
+  if (foundNullElement) {
+arrayBuffer += null
+foundNullElement = false
+  }
+} else {
+  val elem = array1.get(i, elementType)
+  if (hs.contains(elem) && !hsResult.contains(elem)) {
+arrayBuffer += elem
+hsResult.add(elem)
+  }
+}
+i += 1
+  }
+  new GenericArrayData(arrayBuffer)
+} else {
+  new GenericArrayData(Array.emptyObjectArray)
+}
+} else {
+  (array1, array2) =>
+if (array1.numElements() != 0 && array2.numElements() != 0) {
+  val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+  var alreadySeenNull = false
+  var i = 0
+  while (i < array1.numElements()) {
+var found = false
+val elem1 = array1.get(i, elementType)
+if (array1.isNullAt(i)) {
+  if (!alreadySeenNull) {
+var j = 0
+while (!found && j < array2.numElements()) {
+  found = array2.isNullAt(j)
+  j += 1
+}
+// array2 is scanned only once for null element
+alreadySeenNull = true
+  }
+} else {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+if (!array2.isNullAt(j)) {
+  val elem2 = array2.get(j, elementType)
+  if (ordering.equiv(elem1, elem2)) {
+// check whether elem1 is already stored in arrayBuffer
+var foundArrayBuffer = false
+var k = 0
+while (!foundArrayBuffer && k < arrayBuffer.size) {
+  val va = arrayBuffer(k)
+  foundArrayBuffer = (va != null) && 
ordering.equiv(va, elem1)
+  k += 1
+}
+found = !foundArrayBuffer
+  }
+}
+j += 1
+  }
+}
+if (found) {
+  arrayBuffer += elem1
+}
+i += 1
+  }
+  new GenericArrayData(arrayBuffer)
+} else {
+  new GenericArrayData(Array.emptyObjectArray)
+}
+}
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalIntersect(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, 

[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207945648
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3965,6 +4034,248 @@ object ArrayUnion {
   }
 }
 
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike
+  with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+ArrayType(elementType,
+  left.dataType.asInstanceOf[ArrayType].containsNull &&
+right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = 
{
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+if (array1.numElements() != 0 && array2.numElements() != 0) {
+  val hs = new OpenHashSet[Any]
+  val hsResult = new OpenHashSet[Any]
+  var foundNullElement = false
+  var i = 0
+  while (i < array2.numElements()) {
+if (array2.isNullAt(i)) {
+  foundNullElement = true
+} else {
+  val elem = array2.get(i, elementType)
+  hs.add(elem)
+}
+i += 1
+  }
+  val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+  i = 0
+  while (i < array1.numElements()) {
+if (array1.isNullAt(i)) {
+  if (foundNullElement) {
+arrayBuffer += null
+foundNullElement = false
+  }
+} else {
+  val elem = array1.get(i, elementType)
+  if (hs.contains(elem) && !hsResult.contains(elem)) {
+arrayBuffer += elem
+hsResult.add(elem)
+  }
+}
+i += 1
+  }
+  new GenericArrayData(arrayBuffer)
+} else {
+  new GenericArrayData(Array.emptyObjectArray)
+}
+} else {
+  (array1, array2) =>
+if (array1.numElements() != 0 && array2.numElements() != 0) {
+  val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+  var alreadySeenNull = false
+  var i = 0
+  while (i < array1.numElements()) {
+var found = false
+val elem1 = array1.get(i, elementType)
+if (array1.isNullAt(i)) {
+  if (!alreadySeenNull) {
+var j = 0
+while (!found && j < array2.numElements()) {
+  found = array2.isNullAt(j)
+  j += 1
+}
+// array2 is scanned only once for null element
+alreadySeenNull = true
+  }
+} else {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+if (!array2.isNullAt(j)) {
+  val elem2 = array2.get(j, elementType)
+  if (ordering.equiv(elem1, elem2)) {
+// check whether elem1 is already stored in arrayBuffer
+var foundArrayBuffer = false
+var k = 0
+while (!foundArrayBuffer && k < arrayBuffer.size) {
+  val va = arrayBuffer(k)
+  foundArrayBuffer = (va != null) && 
ordering.equiv(va, elem1)
+  k += 1
+}
+found = !foundArrayBuffer
+  }
+}
+j += 1
+  }
+}
+if (found) {
+  arrayBuffer += elem1
+}
+i += 1
+  }
+  new GenericArrayData(arrayBuffer)
+} else {
+  new GenericArrayData(Array.emptyObjectArray)
+}
+}
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalIntersect(array1, array2)
+  }
+
+  override def doGenCode(ctx: 

[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21102


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-06 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207781744
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3965,6 +4034,248 @@ object ArrayUnion {
   }
 }
 
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike
+  with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+ArrayType(elementType,
+  left.dataType.asInstanceOf[ArrayType].containsNull &&
+right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = 
{
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+if (array1.numElements() != 0 && array2.numElements() != 0) {
+  val hs = new OpenHashSet[Any]
+  val hsResult = new OpenHashSet[Any]
+  var foundNullElement = false
+  var i = 0
+  while (i < array2.numElements()) {
+if (array2.isNullAt(i)) {
+  foundNullElement = true
+} else {
+  val elem = array2.get(i, elementType)
+  hs.add(elem)
+}
+i += 1
+  }
+  val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+  i = 0
+  while (i < array1.numElements()) {
+if (array1.isNullAt(i)) {
+  if (foundNullElement) {
+arrayBuffer += null
+foundNullElement = false
+  }
+} else {
+  val elem = array1.get(i, elementType)
+  if (hs.contains(elem) && !hsResult.contains(elem)) {
+arrayBuffer += elem
+hsResult.add(elem)
+  }
+}
+i += 1
+  }
+  new GenericArrayData(arrayBuffer)
+} else {
+  new GenericArrayData(Seq.empty)
--- End diff --

nit: `Array.empty` or `Array.emptyObjectArray`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207767226
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3965,6 +4034,242 @@ object ArrayUnion {
   }
 }
 
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike
+  with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+ArrayType(elementType,
+  left.dataType.asInstanceOf[ArrayType].containsNull &&
+right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = 
{
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+val hsResult = new OpenHashSet[Any]
+var foundNullElement = false
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+foundNullElement = true
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (foundNullElement) {
+  arrayBuffer += null
+  foundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (hs.contains(elem) && !hsResult.contains(elem)) {
+  arrayBuffer += elem
+  hsResult.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var alreadySeenNull = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (array1.isNullAt(i)) {
+if (!alreadySeenNull) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  alreadySeenNull = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  if (!array2.isNullAt(j)) {
+val elem2 = array2.get(j, elementType)
+if (ordering.equiv(elem1, elem2)) {
+  // check whether elem1 is already stored in arrayBuffer
+  var foundArrayBuffer = false
+  var k = 0
+  while (!foundArrayBuffer && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+foundArrayBuffer = (va != null) && ordering.equiv(va, 
elem1)
+k += 1
+  }
+  found = !foundArrayBuffer
+}
+  }
+  j += 1
+}
+  }
+  if (found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+}
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalIntersect(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val value = ctx.freshName("value")
+val size = ctx.freshName("size")
+if (canUseSpecializedHashSet) {
+  val jt = CodeGenerator.javaType(elementType)
+  val ptName = CodeGenerator.primitiveTypeName(jt)
+
+  nullSafeCodeGen(ctx, ev, (array1, array2) => {
+val foundNullElement = 

[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207766511
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3965,6 +4034,242 @@ object ArrayUnion {
   }
 }
 
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike
+  with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+ArrayType(elementType,
+  left.dataType.asInstanceOf[ArrayType].containsNull &&
+right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = 
{
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+val hsResult = new OpenHashSet[Any]
+var foundNullElement = false
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+foundNullElement = true
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (foundNullElement) {
+  arrayBuffer += null
+  foundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (hs.contains(elem) && !hsResult.contains(elem)) {
+  arrayBuffer += elem
+  hsResult.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var alreadySeenNull = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (array1.isNullAt(i)) {
+if (!alreadySeenNull) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  alreadySeenNull = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  if (!array2.isNullAt(j)) {
+val elem2 = array2.get(j, elementType)
+if (ordering.equiv(elem1, elem2)) {
+  // check whether elem1 is already stored in arrayBuffer
+  var foundArrayBuffer = false
+  var k = 0
+  while (!foundArrayBuffer && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+foundArrayBuffer = (va != null) && ordering.equiv(va, 
elem1)
+k += 1
+  }
+  found = !foundArrayBuffer
+}
+  }
+  j += 1
+}
+  }
+  if (found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+}
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalIntersect(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
+val value = ctx.freshName("value")
+val size = ctx.freshName("size")
+if (canUseSpecializedHashSet) {
+  val jt = CodeGenerator.javaType(elementType)
+  val ptName = CodeGenerator.primitiveTypeName(jt)
+
+  nullSafeCodeGen(ctx, ev, (array1, array2) => {
+val foundNullElement = 

[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-05 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207765490
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3965,6 +4034,242 @@ object ArrayUnion {
   }
 }
 
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike
+  with ComplexTypeMergingExpression {
+  override def dataType: DataType = {
+dataTypeCheck
+ArrayType(elementType,
+  left.dataType.asInstanceOf[ArrayType].containsNull &&
+right.dataType.asInstanceOf[ArrayType].containsNull)
+  }
+
+  @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = 
{
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
--- End diff --

How about shortcutting to return an empty array when we find one of the two 
is empty?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-05 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207758427
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -1647,6 +1647,60 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 assert(result10.first.schema(0).dataType === expectedType10)
   }
 
+  test("array_intersect functions") {
+val df1 = Seq((Array(1, 2, 4), Array(4, 2))).toDF("a", "b")
+val ans1 = Row(Seq(2, 4))
+checkAnswer(df1.select(array_intersect($"a", $"b")), ans1)
+checkAnswer(df1.selectExpr("array_intersect(a, b)"), ans1)
+
+val df2 = Seq((Array[Integer](1, 2, null, 4, 5), Array[Integer](-5, 4, 
null, 2, -1)))
+  .toDF("a", "b")
+val ans2 = Row(Seq(2, null, 4))
+checkAnswer(df2.select(array_intersect($"a", $"b")), ans2)
+checkAnswer(df2.selectExpr("array_intersect(a, b)"), ans2)
+
+val df3 = Seq((Array(1L, 2L, 4L), Array(4L, 2L))).toDF("a", "b")
+val ans3 = Row(Seq(2L, 4L))
+checkAnswer(df3.select(array_intersect($"a", $"b")), ans3)
+checkAnswer(df3.selectExpr("array_intersect(a, b)"), ans3)
+
+val df4 = Seq(
+  (Array[java.lang.Long](1L, 2L, null, 4L, 5L), 
Array[java.lang.Long](-5L, 4L, null, 2L, -1L)))
+  .toDF("a", "b")
+val ans4 = Row(Seq(2L, null, 4L))
+checkAnswer(df4.select(array_intersect($"a", $"b")), ans4)
+checkAnswer(df4.selectExpr("array_intersect(a, b)"), ans4)
+
+val df5 = Seq((Array("c", null, "a", "f"), Array("b", "a", null, 
"g"))).toDF("a", "b")
+val ans5 = Row(Seq(null, "a"))
+checkAnswer(df5.select(array_intersect($"a", $"b")), ans5)
+checkAnswer(df5.selectExpr("array_intersect(a, b)"), ans5)
+
+val df6 = Seq((null, null)).toDF("a", "b")
+intercept[AnalysisException] {
--- End diff --

Could you also check the error message?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-08-04 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r207723142
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3801,339 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
--- End diff --

This is back again?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-28 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r205959224
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3968,3 +3964,234 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
+left.dataType.asInstanceOf[ArrayType].containsNull &&
+  right.dataType.asInstanceOf[ArrayType].containsNull)
+
+  @transient lazy val evalIntersect: (ArrayData, ArrayData) => ArrayData = 
{
+if (elementTypeSupportEquals) {
+  (array1, array2) =>
+val hs = new OpenHashSet[Any]
+val hsResult = new OpenHashSet[Any]
+var foundNullElement = false
+var i = 0
+while (i < array2.numElements()) {
+  if (array2.isNullAt(i)) {
+foundNullElement = true
+  } else {
+val elem = array2.get(i, elementType)
+hs.add(elem)
+  }
+  i += 1
+}
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+i = 0
+while (i < array1.numElements()) {
+  if (array1.isNullAt(i)) {
+if (foundNullElement) {
+  arrayBuffer += null
+  foundNullElement = false
+}
+  } else {
+val elem = array1.get(i, elementType)
+if (hs.contains(elem) && !hsResult.contains(elem)) {
+  arrayBuffer += elem
+  hsResult.add(elem)
+}
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+} else {
+  (array1, array2) =>
+val arrayBuffer = new scala.collection.mutable.ArrayBuffer[Any]
+var alreadySeenNull = false
+var i = 0
+while (i < array1.numElements()) {
+  var found = false
+  val elem1 = array1.get(i, elementType)
+  if (array1.isNullAt(i)) {
+if (!alreadySeenNull) {
+  var j = 0
+  while (!found && j < array2.numElements()) {
+found = array2.isNullAt(j)
+j += 1
+  }
+  // array2 is scanned only once for null element
+  alreadySeenNull = true
+}
+  } else {
+var j = 0
+while (!found && j < array2.numElements()) {
+  if (!array2.isNullAt(j)) {
+val elem2 = array2.get(j, elementType)
+if (ordering.equiv(elem1, elem2)) {
+  // check whether elem1 is already stored in arrayBuffer
+  var foundArrayBuffer = false
+  var k = 0
+  while (!foundArrayBuffer && k < arrayBuffer.size) {
+val va = arrayBuffer(k)
+foundArrayBuffer = (va != null) && ordering.equiv(va, 
elem1)
+k += 1
+  }
+  found = !foundArrayBuffer
+}
+  }
+  j += 1
+}
+  }
+  if (found) {
+arrayBuffer += elem1
+  }
+  i += 1
+}
+new GenericArrayData(arrayBuffer)
+}
+  }
+
+  override def nullSafeEval(input1: Any, input2: Any): Any = {
+val array1 = input1.asInstanceOf[ArrayData]
+val array2 = input2.asInstanceOf[ArrayData]
+
+evalIntersect(array1, array2)
+  }
+
+  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+val arrayData = classOf[ArrayData].getName
+val i = ctx.freshName("i")
--- End diff --

It would be good to refactor as a method from L4077 to L4124 since this 
part can be used among `union`, `except`, and `intersect`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r205930801
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -272,7 +272,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
 
   private def nextPowerOf2(n: Int): Int = {
 if (n == 0) {
-  1
+  2
--- End diff --

Oh, good catch.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r205930794
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3801,339 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
+left.dataType.asInstanceOf[ArrayType].containsNull &&
+  right.dataType.asInstanceOf[ArrayType].containsNull)
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsResultInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+  var hsResultLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (hsInt.contains(elem) && !hsResultInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsResultInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (hsLong.contains(elem) && !hsResultLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsResultLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  initFoundNullElement: Boolean,
+  isLongType: Boolean): (Int, Boolean) = {
+// store elements into resultArray
+var i = 0
+var foundNullElement = initFoundNullElement
+if (resultArray == null) {
+  // hsInt or hsLong is updated only once since it is not changed
+  while (i < array1.numElements()) {
--- End diff --

You are right, fixed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-27 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r205853523
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -272,7 +272,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
 
   private def nextPowerOf2(n: Int): Int = {
 if (n == 0) {
-  1
+  2
--- End diff --

Good catch, thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r205341581
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3801,339 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
+left.dataType.asInstanceOf[ArrayType].containsNull &&
+  right.dataType.asInstanceOf[ArrayType].containsNull)
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsResultInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+  var hsResultLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (hsInt.contains(elem) && !hsResultInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsResultInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (hsLong.contains(elem) && !hsResultLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsResultLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  initFoundNullElement: Boolean,
+  isLongType: Boolean): (Int, Boolean) = {
+// store elements into resultArray
+var i = 0
+var foundNullElement = initFoundNullElement
+if (resultArray == null) {
+  // hsInt or hsLong is updated only once since it is not changed
+  while (i < array1.numElements()) {
--- End diff --

`array1` and `array2` is opposite if we want to preserve the element order 
of the left array?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r205342201
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3801,339 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
+  > SELECT _FUNC_(array(1, 2, 3), array(1, 3, 5));
+   array(1, 3)
+  """,
+  since = "2.4.0")
+case class ArrayIntersect(left: Expression, right: Expression) extends 
ArraySetLike {
+  override def dataType: DataType = ArrayType(elementType,
+left.dataType.asInstanceOf[ArrayType].containsNull &&
+  right.dataType.asInstanceOf[ArrayType].containsNull)
+
+  var hsInt: OpenHashSet[Int] = _
+  var hsResultInt: OpenHashSet[Int] = _
+  var hsLong: OpenHashSet[Long] = _
+  var hsResultLong: OpenHashSet[Long] = _
+
+  def assignInt(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getInt(idx)
+if (hsInt.contains(elem) && !hsResultInt.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setInt(pos, elem)
+  }
+  hsResultInt.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def assignLong(array: ArrayData, idx: Int, resultArray: ArrayData, pos: 
Int): Boolean = {
+val elem = array.getLong(idx)
+if (hsLong.contains(elem) && !hsResultLong.contains(elem)) {
+  if (resultArray != null) {
+resultArray.setLong(pos, elem)
+  }
+  hsResultLong.add(elem)
+  true
+} else {
+  false
+}
+  }
+
+  def evalIntLongPrimitiveType(
+  array1: ArrayData,
+  array2: ArrayData,
+  resultArray: ArrayData,
+  initFoundNullElement: Boolean,
+  isLongType: Boolean): (Int, Boolean) = {
+// store elements into resultArray
+var i = 0
+var foundNullElement = initFoundNullElement
+if (resultArray == null) {
+  // hsInt or hsLong is updated only once since it is not changed
--- End diff --

I might miss something, but can we do the same thing for `array_except`? It 
would be good if we can skip traversing the right array. This is not urgent, 
maybe we can do it in the follow-up pr of `array_except` pr.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-26 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r205327639
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -272,7 +272,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
 
   private def nextPowerOf2(n: Int): Int = {
 if (n == 0) {
-  1
+  2
--- End diff --

Why changed this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-23 Thread mn-mikke
Github user mn-mikke commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r204349890
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -3805,3 +3801,339 @@ object ArrayUnion {
 new GenericArrayData(arrayBuffer)
   }
 }
+
+/**
+ * Returns an array of the elements in the intersect of x and y, without 
duplicates
+ */
+@ExpressionDescription(
+  usage = """
+  _FUNC_(array1, array2) - Returns an array of the elements in the 
intersection of array1 and
+array2, without duplicates.
+  """,
+  examples = """
+Examples:Fun
--- End diff --

Just ```Examples:```?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203322643
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
* to a new position (in the new data array).
*/
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, 
Int) => Unit) {
-if (_size > _growThreshold) {
+if (_occupied > _growThreshold) {
--- End diff --

For accuracy sake - my example snippet above will fail much earlier - due 
to OpenHashSet. MAX_CAPACITY. Though that is probably not the point anyway :-)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203322056
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
* to a new position (in the new data array).
*/
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, 
Int) => Unit) {
-if (_size > _growThreshold) {
+if (_occupied > _growThreshold) {
--- End diff --

There is no explicitly entry here - it is simply unoccupied slots in an 
array.
The slot is free, it can be used by some other (new) entry when insert is 
called.

It must be trivial to see how very bad behavior can happen with actual size 
of set being very small - with a series of add/remove's : resulting in unending 
growth of the set.

something like this, for example, is enough to cause set to blow to 2B 
entries:
```
var i = 0
while (i < Int.MaxValue) {
  set.add(1)
  set.remove(1)
  assert (0 == set.size)
  i += 1
}
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203319710
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
* to a new position (in the new data array).
*/
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, 
Int) => Unit) {
-if (_size > _growThreshold) {
+if (_occupied > _growThreshold) {
--- End diff --

When 'remove' is called, '_size' is decremented. But, an entry is not 
released. This is  a motivation to introduce 'occupied'.
I will try to use another implementation without 'remove' while it may 
introduce some overhead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203318288
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
   protected var _capacity = nextPowerOf2(initialCapacity)
   protected var _mask = _capacity - 1
   protected var _size = 0
+  protected var _occupied = 0
   protected var _growThreshold = (loadFactor * _capacity).toInt
+  def g: Int = _growThreshold
+  def o: Int = _occupied
--- End diff --

Oh, sorry for putting this. This is used only for my debugging. This should 
be removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203315417
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -114,6 +118,21 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
 rehashIfNeeded(k, grow, move)
   }
 
+  /**
+   * Remove an element from the set. If an element does not exists in the 
set, nothing is done.
+   */
+  def remove(k: T): Unit = {
--- End diff --

If we need to keep an order without duplication, we can implement this by 
inroducing another hashset or searching a result array when we try to add an 
new element.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203311755
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
   protected var _capacity = nextPowerOf2(initialCapacity)
   protected var _mask = _capacity - 1
   protected var _size = 0
+  protected var _occupied = 0
   protected var _growThreshold = (loadFactor * _capacity).toInt
+  def g: Int = _growThreshold
+  def o: Int = _occupied
 
   protected var _bitset = new BitSet(_capacity)
+  protected var _bitsetDeleted: BitSet = null
--- End diff --

Why protected ? Make it private instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203314045
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -163,7 +187,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
* to a new position (in the new data array).
*/
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, 
Int) => Unit) {
-if (_size > _growThreshold) {
+if (_occupied > _growThreshold) {
--- End diff --

I dont see any value in _occupied - on contrary it can cause very bad 
behavior if there is a lot of remove's expected.
`_size` is a better metric to decide to rehash and grow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203311109
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -85,9 +85,13 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
   protected var _capacity = nextPowerOf2(initialCapacity)
   protected var _mask = _capacity - 1
   protected var _size = 0
+  protected var _occupied = 0
   protected var _growThreshold = (loadFactor * _capacity).toInt
+  def g: Int = _growThreshold
+  def o: Int = _occupied
--- End diff --

Also, please use more descriptive and comprehensible names


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-18 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r203299689
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala ---
@@ -114,6 +118,21 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag](
 rehashIfNeeded(k, grow, move)
   }
 
+  /**
+   * Remove an element from the set. If an element does not exists in the 
set, nothing is done.
+   */
+  def remove(k: T): Unit = {
--- End diff --

Maybe we should not add `remove` method unless we can add it by a simple 
way. This is used in many places and this might affect their performance. How 
about using other implementation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-16 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r202782954
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala ---
@@ -73,6 +73,46 @@ class OpenHashSetSuite extends SparkFunSuite with 
Matchers {
 assert(set.contains(50))
 assert(set.contains(999))
 assert(!set.contains(1))
+
+set.add(1132)  // Cause hash contention with 999
+assert(set.size === 4)
+assert(set.contains(10))
+assert(set.contains(50))
+assert(set.contains(999))
+assert(set.contains(1132))
+assert(!set.contains(1))
+
+set.remove(1132)
+assert(set.size === 3)
+assert(set.contains(10))
+assert(set.contains(50))
+assert(set.contains(999))
+assert(!set.contains(1132))
+assert(!set.contains(1))
+
+set.remove(999)
--- End diff --

good catch, I addressed this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-07-16 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21102#discussion_r202621143
  
--- Diff: 
core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala ---
@@ -73,6 +73,46 @@ class OpenHashSetSuite extends SparkFunSuite with 
Matchers {
 assert(set.contains(50))
 assert(set.contains(999))
 assert(!set.contains(1))
+
+set.add(1132)  // Cause hash contention with 999
+assert(set.size === 4)
+assert(set.contains(10))
+assert(set.contains(50))
+assert(set.contains(999))
+assert(set.contains(1132))
+assert(!set.contains(1))
+
+set.remove(1132)
+assert(set.size === 3)
+assert(set.contains(10))
+assert(set.contains(50))
+assert(set.contains(999))
+assert(!set.contains(1132))
+assert(!set.contains(1))
+
+set.remove(999)
--- End diff --

What if we remove `999` before `1132`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21102: [SPARK-23913][SQL] Add array_intersect function

2018-04-18 Thread kiszk
GitHub user kiszk opened a pull request:

https://github.com/apache/spark/pull/21102

[SPARK-23913][SQL] Add array_intersect function 

## What changes were proposed in this pull request?

The PR adds the SQL function `array_intersect`. The behavior of the 
function is based on Presto's one.

This function returns returns an array of the elements in the intersection 
of array1 and array2.

Note: The order of elements in the result is not defined.

## How was this patch tested?

Added UTs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kiszk/spark SPARK-23913

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21102.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21102


commit 548a4b804472e062e36308274d1aff8909621131
Author: Kazuaki Ishizaki 
Date:   2018-04-18T16:01:50Z

initial commit




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org