[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-26 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140968754
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, 

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140936872
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
--- End diff --

Yeah sorry, I meant to go back and fully address this comment the last time 
but I forgot.

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140935586
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
--- End diff --

I don't think it's correct to filter the output from remove. The query

Seq(1, 2, 3).toDF("val1").join(Seq[Int]().toDF("val2"), 'val1 === 'val2 && 
'val1 === 0, "left_outer")

produces ((1, null), (2, null), (3, null)).


Outer joins with watermark range conditions also wouldn't work if we 
filtered remove output, since the range condition would exclude null values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140933568
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -233,16 +234,46 @@ object UnsupportedOperationChecker {
 throwError("Full outer joins with streaming 
DataFrames/Datasets are not supported")
   }
 
-case LeftOuter | LeftSemi | LeftAnti =>
+case LeftSemi | LeftAnti =>
   if (right.isStreaming) {
-throwError("Left outer/semi/anti joins with a streaming 
DataFrame/Dataset " +
-"on the right is not supported")
+throwError("Left semi/anti joins with a streaming 
DataFrame/Dataset " +
+"on the right are not supported")
   }
 
-case RightOuter =>
-  if (left.isStreaming) {
-throwError("Right outer join with a streaming 
DataFrame/Dataset on the left is " +
-"not supported")
+// We support left and right outer streaming joins only in the 
stream+stream case.
+case LeftOuter | RightOuter =>
+  if (joinType == LeftOuter && !left.isStreaming && 
right.isStreaming) {
+throwError("Left outer join with a streaming 
DataFrame/Dataset " +
+  "on the right and non-streaming on the left is not 
supported")
+  }
+  if (joinType == RightOuter && left.isStreaming && 
!right.isStreaming) {
+throwError("Right outer join with a streaming 
DataFrame/Dataset on the left and " +
+"non-streaming on the right not supported")
+  }
+
+  if (left.isStreaming && right.isStreaming) {
+val watermarkInJoinKeys = subPlan match {
+  case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, 
_) =>
+val keySet = AttributeSet(leftKeys ++ rightKeys)
+(leftKeys ++ rightKeys).exists {
+  case a: AttributeReference => 
a.metadata.contains(EventTimeWatermark.delayKey)
+  case _ => false
+}
+  case _ => false
+}
+
+val oppositeSideHasWatermark = joinType match {
+  case LeftOuter =>
+
right.output.find(_.metadata.contains(EventTimeWatermark.delayKey)).isDefined
+  case RightOuter =>
--- End diff --

This condition is not sufficient. state can be dropped from left side (in 
left outer join) only when both conditions are satisfied
1. right.output has watermark
2. join condition has a time range condition such that left side state 
value watermark can be defined from the right side event time watermark. 

Now `StreamingSymmetricHashJoinHelper.getOneSideStateWatermarkPredicate` 
checks for (2). 
So you have to invoke that to find out whether the conditions are 
sufficient for defining state value watermarks
However, that is in the SQL project, not catalyst. So you may have to move 
that helper class into catalyst, and then use it from the 
UnsupportedOperationChecker
(also have to rename that helper class accordingly what makes sense)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread joseph-torres
Github user joseph-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140933095
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec(
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  override def output: Seq[Attribute] = joinType match {
+case _: InnerLike => left.output ++ right.output
+case LeftOuter => left.output ++ 
right.output.map(_.withNullability(true))
+case RightOuter => left.output.map(_.withNullability(true)) ++ 
right.output
+case _ =>
+  throwBadJoinTypeException()
+  Seq()
+  }
 
   override def outputPartitioning: Partitioning = joinType match {
 case _: InnerLike =>
   PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
+case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning))
--- End diff --

That's what I thought at first, but the non-streaming HashJoin seems to do 
the partitioning this way. (Or am I misunderstanding what buildSide means in 
that trait?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140931385
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 ---
@@ -233,16 +234,46 @@ object UnsupportedOperationChecker {
 throwError("Full outer joins with streaming 
DataFrames/Datasets are not supported")
   }
 
-case LeftOuter | LeftSemi | LeftAnti =>
+case LeftSemi | LeftAnti =>
   if (right.isStreaming) {
-throwError("Left outer/semi/anti joins with a streaming 
DataFrame/Dataset " +
-"on the right is not supported")
+throwError("Left semi/anti joins with a streaming 
DataFrame/Dataset " +
+"on the right are not supported")
   }
 
-case RightOuter =>
-  if (left.isStreaming) {
-throwError("Right outer join with a streaming 
DataFrame/Dataset on the left is " +
-"not supported")
+// We support left and right outer streaming joins only in the 
stream+stream case.
--- End diff --

not really. leftOuterJoin(stream, batch) is allowed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140914684
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -309,19 +396,24 @@ class SymmetricHashJoinStateManager(
   stateStore.get(keyWithIndexRow(key, valueIndex))
 }
 
-/** Get all the values for key and all indices. */
-def getAll(key: UnsafeRow, numValues: Long): Iterator[UnsafeRow] = {
+/**
+ * Get all values and indices for the provided key.
+ * Should not return null.
+ */
+def getAll(key: UnsafeRow, numValues: Long): 
Iterator[KeyWithIndexAndValue] = {
+  val keyWithIndexAndValue = new KeyWithIndexAndValue()
   var index = 0
-  new NextIterator[UnsafeRow] {
-override protected def getNext(): UnsafeRow = {
+  new NextIterator[KeyWithIndexAndValue] {
+override protected def getNext(): KeyWithIndexAndValue = {
   if (index >= numValues) {
 finished = true
 null
   } else {
 val keyWithIndex = keyWithIndexRow(key, index)
 val value = stateStore.get(keyWithIndex)
 index += 1
-value
+// return original index
+keyWithIndexAndValue.withNew(key, index - 1, value)
--- End diff --

super nit: better to make it 
```
keyWithIndexAndValue.set(key, index, value)
index +=1 
keyWithIndexAndValue
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140912351
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
--- End diff --

can you rename condition to "removalCondition" to make it clear its a 
condition for "what to remove", and not "what to keep"
for the other method as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140906369
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -324,17 +389,33 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
-/** Remove old buffered state rows using watermarks for state keys and 
values */
-def removeOldState(): Unit = {
+/**
+ * Builds an iterator over old state key-value pairs, removing them 
lazily as they're produced.
+ *
+ * @note This iterator must be consumed fully before any other 
operations are made
+ * against this joiner's join state manager. For efficiency reasons, 
the intermediate states of
+ * the iterator leave the state manager in an invalid configuration.
--- End diff --

invalid configuration -> undefined state. it may or may not be valid. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140927997
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 ---
@@ -416,32 +421,32 @@ class UnsupportedOperationsSuite extends 
SparkFunSuite {
   // Left outer joins: *-stream not allowed
--- End diff --

need to test cases where it should not be allowed in outer joins.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140905683
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+// We generate the outer join input by:
+// * Getting an iterator over the rows that have aged out on the 
left side. These rows are
+//   candidates for being null joined. Note that to avoid doing 
two passes, this iterator
+//   removes the rows from the state manager as they're processed.
+// * Checking whether the current row matches a key in the right 
side state. If it doesn't,
+//   we know we can join with null, since there was never 
(including this batch) a match
+//   within the watermark period. If it does, there must have been 
a match at some point, so
+//   we know we can't join with null.
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+val removedRowIter = leftSideJoiner.removeOldState()
+val outerOutputIter = removedRowIter
+  .filterNot(pair => rightSideJoiner.containsKey(pair.key))
+  .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+
+filteredInnerOutputIter ++ outerOutputIter
+  case RightOuter =>
+// See comments for left outer case.
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+val removedRowIter = rightSideJoiner.removeOldState()
+val outerOutputIter = removedRowIter
+  .filterNot(pair => leftSideJoiner.containsKey(pair.key))
+  .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+filteredInnerOutputIter ++ outerOutputIter
+  case _ =>
+throwBadJoinTypeException()
+Iterator()
+}
+
+val outputIterWithMetrics = outputIter.map { row =>
+  numOutputRows += 1
+  row
+}
 
 // Function to remove old state after all the input has been consumed 
and output generated
 def onOutputCompletion = {
   allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - 
updateStartTimeNs), 0)
 
-  // Remove old state if needed
+  // TODO: how to get this for removals as part of outer join?
   allRemovalsTimeMs += timeTakenMs {
-leftSideJoiner.removeOldState()
-rightSideJoiner.removeOldState()
+// Iterator which must be consumed after output completion before 
committing.
+// For outer joins, we've removed old state from the appropriate 
side inline while we
+// produced the null rows. So we need to finish cleaning the other 
side. For inner joins
--- End diff --

"appropriate side inline" does not make sense to me. Something like this 
would be better 

"For inner joins, we have to remove unnecessary state rows from both sides 
if possible. For outer joins, we have already removed unnecessary state rows 
from the outer side (e.g., left side for left outer join) while generating the 
outer "null" outputs. Now, we have to remove unnecessary state rows from the 
other side (e.g., right side for the left outer join) if possible. In all 
cases, nothing needs to be outputted, hence the removal needs to be done 
greedily by immediately consuming the returned iterator."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140910541
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
--- End diff --

if you are using return, then no point doing else. best to consistently 
stick to one, either use if-return (no else) or if-else (no return).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140903737
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+// We generate the outer join input by:
+// * Getting an iterator over the rows that have aged out on the 
left side. These rows are
+//   candidates for being null joined. Note that to avoid doing 
two passes, this iterator
+//   removes the rows from the state manager as they're processed.
+// * Checking whether the current row matches a key in the right 
side state. If it doesn't,
+//   we know we can join with null, since there was never 
(including this batch) a match
+//   within the watermark period. If it does, there must have been 
a match at some point, so
+//   we know we can't join with null.
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+val removedRowIter = leftSideJoiner.removeOldState()
+val outerOutputIter = removedRowIter
+  .filterNot(pair => rightSideJoiner.containsKey(pair.key))
+  .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+
+filteredInnerOutputIter ++ outerOutputIter
+  case RightOuter =>
+// See comments for left outer case.
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+val removedRowIter = rightSideJoiner.removeOldState()
+val outerOutputIter = removedRowIter
+  .filterNot(pair => leftSideJoiner.containsKey(pair.key))
+  .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+filteredInnerOutputIter ++ outerOutputIter
+  case _ =>
+throwBadJoinTypeException()
+Iterator()
--- End diff --

this can be removed. see above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140900883
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec(
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  override def output: Seq[Attribute] = joinType match {
+case _: InnerLike => left.output ++ right.output
+case LeftOuter => left.output ++ 
right.output.map(_.withNullability(true))
+case RightOuter => left.output.map(_.withNullability(true)) ++ 
right.output
+case _ =>
+  throwBadJoinTypeException()
+  Seq()
+  }
 
   override def outputPartitioning: Partitioning = joinType match {
 case _: InnerLike =>
   PartitioningCollection(Seq(left.outputPartitioning, 
right.outputPartitioning))
+case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning))
--- End diff --

shouldnt this be `left.outputPartitioning` for LeftOuter? The the 
attributes from the right side can become null, hence the right side 
partitioning will not be preserved.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140910588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
--- End diff --

Add comments on what this while look is doing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140911830
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140913768
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140911442
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140904307
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+// We generate the outer join input by:
+// * Getting an iterator over the rows that have aged out on the 
left side. These rows are
+//   candidates for being null joined. Note that to avoid doing 
two passes, this iterator
+//   removes the rows from the state manager as they're processed.
+// * Checking whether the current row matches a key in the right 
side state. If it doesn't,
+//   we know we can join with null, since there was never 
(including this batch) a match
+//   within the watermark period. If it does, there must have been 
a match at some point, so
+//   we know we can't join with null.
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+val removedRowIter = leftSideJoiner.removeOldState()
+val outerOutputIter = removedRowIter
+  .filterNot(pair => rightSideJoiner.containsKey(pair.key))
+  .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+
+filteredInnerOutputIter ++ outerOutputIter
+  case RightOuter =>
+// See comments for left outer case.
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+val removedRowIter = rightSideJoiner.removeOldState()
+val outerOutputIter = removedRowIter
+  .filterNot(pair => leftSideJoiner.containsKey(pair.key))
+  .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+filteredInnerOutputIter ++ outerOutputIter
+  case _ =>
+throwBadJoinTypeException()
+Iterator()
+}
+
+val outputIterWithMetrics = outputIter.map { row =>
+  numOutputRows += 1
+  row
+}
 
 // Function to remove old state after all the input has been consumed 
and output generated
 def onOutputCompletion = {
   allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - 
updateStartTimeNs), 0)
 
-  // Remove old state if needed
+  // TODO: how to get this for removals as part of outer join?
   allRemovalsTimeMs += timeTakenMs {
-leftSideJoiner.removeOldState()
-rightSideJoiner.removeOldState()
+// Iterator which must be consumed after output completion before 
committing.
--- End diff --

nit: Rather than starting with how its implemented (i.e. referring to 
iterator), first explain what does this code do semanticall (e.g. Remove all 
the state rows that are not needed anymore).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140913094
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140928059
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -146,7 +146,14 @@ case class StreamingSymmetricHashJoinExec(
   stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, 
right)
   }
 
-  require(joinType == Inner, s"${getClass.getSimpleName} should not take 
$joinType as the JoinType")
+  private def throwBadJoinTypeException(): Unit = {
--- End diff --

Unit -> Nothing. See comment below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140912029
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140911245
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager(
   }
 
   /**
-   * Remove using a predicate on keys. See class docs for more context and 
implement details.
+   * Remove using a predicate on keys.
+   *
+   * This produces an iterator over the (key, value) pairs satisfying 
condition(key), where the
+   * underlying store is updated as a side-effect of producing next.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: KeyAndNumValues = null
+  private var currentValues: Iterator[KeyWithIndexAndValue] = null
+
+  private def currentKey = currentKeyToNumValue.key
+
+  private val reusedPair = new UnsafeRowPair()
+
+  private def getAndRemoveValue() = {
+val keyWithIndexAndValue = currentValues.next()
+keyWithIndexToValue.remove(currentKey, 
keyWithIndexAndValue.valueIndex)
+reusedPair.withRows(currentKey, keyWithIndexAndValue.value)
+  }
+
+  override def getNext(): UnsafeRowPair = {
+if (currentValues != null && currentValues.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = allKeyToNumValues.next()
+if (condition(currentKey)) {
+  currentValues = keyWithIndexToValue.getAll(
+currentKey, currentKeyToNumValue.numValue)
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
-   * Remove using a predicate on values. See class docs for more context 
and implementation details.
+   * Remove using a predicate on values.
+   *
+   * At a high level, this produces an iterator over the (key, value) 
pairs such that value
+   * satisfies the predicate, where producing an element removes the value 
from the state store
+   * and producing all elements with a given key updates it accordingly.
+   *
+   * This implies the iterator must be consumed fully without any other 
operations on this manager
+   * or the underlying store being interleaved.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[UnsafeRowPair] = {
+new NextIterator[UnsafeRowPair] {
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  // Reuse this object to avoid creation+GC overhead.
+  private val reusedPair = new UnsafeRowPair()
 
-  var numValues: Long = keyToNumValue.numValue
-  var index: Long = 0L
-  var valueRemoved: Boolean = false
-  var valueForIndex: UnsafeRow = null
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private var currentKey: UnsafeRow = null
+  private var numValues: Long = 0L
+  private var index: Long = 0L
+  private var valueRemoved: Boolean = false
+
+  // Push the data for the current key to the numValues store, and 
reset the tracking variables
+  // to their empty state.
+  private def storeCurrentKey(): Unit = {
--- End diff --

nit: you are not really storing a new key, you are updating the value of an 
existing key, or 

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140900266
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec(
   override def requiredChildDistribution: Seq[Distribution] =
 ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
 
-  override def output: Seq[Attribute] = left.output ++ right.output
+  override def output: Seq[Attribute] = joinType match {
+case _: InnerLike => left.output ++ right.output
+case LeftOuter => left.output ++ 
right.output.map(_.withNullability(true))
+case RightOuter => left.output.map(_.withNullability(true)) ++ 
right.output
+case _ =>
+  throwBadJoinTypeException()
+  Seq()
--- End diff --

I think if you define the return type of throwBadJoinTypeException as 
Nothing, then this Seq() wont be needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140903616
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
--- End diff --

I had a comment on this earlier, the output from remove needs to be 
filtered as well. so this should be moved to later, on the (combined inner join 
+ outer null from remove) iterator


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-25 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140910781
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -324,17 +389,33 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
-/** Remove old buffered state rows using watermarks for state keys and 
values */
-def removeOldState(): Unit = {
+/**
+ * Builds an iterator over old state key-value pairs, removing them 
lazily as they're produced.
+ *
+ * @note This iterator must be consumed fully before any other 
operations are made
+ * against this joiner's join state manager. For efficiency reasons, 
the intermediate states of
+ * the iterator leave the state manager in an invalid configuration.
+ *
+ * We do this to avoid requiring either two passes or full 
materialization when
+ * processing the rows for outer join.
+ */
+def removeOldState(): Iterator[UnsafeRowPair] = {
   stateWatermarkPredicate match {
 case Some(JoinStateKeyWatermarkPredicate(expr)) =>
   
joinStateManager.removeByKeyCondition(stateKeyWatermarkPredicateFunc)
 case Some(JoinStateValueWatermarkPredicate(expr)) =>
   
joinStateManager.removeByValueCondition(stateValueWatermarkPredicateFunc)
-case _ =>
+case _ => Iterator()
--- End diff --

you can do Iterator.empty. more obvious


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140605142
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -146,7 +146,13 @@ case class StreamingSymmetricHashJoinExec(
   stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, 
right)
   }
 
-  require(joinType == Inner, s"${getClass.getSimpleName} should not take 
$joinType as the JoinType")
+  private lazy val badJoinTypeException =
--- End diff --

This should be a def. I dont think exceptions should be created and thrown 
later. I am not sure whether it will capture the stack trace of when it was 
created or when it was thrown. Either way its a bad pattern to have. A better 
approach is to make it into a function `def throwBadJoinTypeException() { throw 
new ... }`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140608418
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
--- End diff --

current -> currentValue


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140615004
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
--- End diff --

This loop can probably be split into two methods `findNextValueToRemove`, 
and `removeValue`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140616618
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  leftSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
rightSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(value).withRight(nullRight) }
+  case RightOuter =>
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  rightSideJoiner
--- End diff --

nit: split this into two statements with an intermediate variable 
`removedRowsIter`. and docs on what this does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614146
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
--- End diff --

maybe rename`valueForIndex -> currentValue` and `index -> 
nextValueIndex `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140605854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
-/** Remove old buffered state rows using watermarks for state keys and 
values */
-def removeOldState(): Unit = {
+/**
+ * Builds an iterator over old state key-value pairs, removing them 
lazily as they're produced.
+ *
+ * This iterator is dangerous! It must be consumed fully before any 
other operations are made
--- End diff --

Maybe tone down the language is a little :) 
Rather than saying "dangerous", uses `@note`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140617927
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  leftSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
rightSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(value).withRight(nullRight) }
+  case RightOuter =>
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  rightSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
leftSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(nullLeft).withRight(value) }
+  case _ => throw badJoinTypeException
+}
+
+val outputIterWithMetrics = outputIter.map { row =>
+  numOutputRows += 1
+  row
+}
+
+// Iterator which must be consumed after output completion before 
committing.
+val cleanupIter = joinType match {
+  case Inner =>
+leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
+  case LeftOuter => rightSideJoiner.removeOldState()
+  case RightOuter => leftSideJoiner.removeOldState()
+  case _ => throw badJoinTypeException
+}
 
 // Function to remove old state after all the input has been consumed 
and output generated
 def onOutputCompletion = {
   allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - 
updateStartTimeNs), 0)
 
-  // Remove old state if needed
+  // TODO: how to get this for removals as part of outer join?
   allRemovalsTimeMs += timeTakenMs {
-leftSideJoiner.removeOldState()
-rightSideJoiner.removeOldState()
+cleanupIter.foreach(_ => ())
--- End diff --

dont use foreach. scala's foreach is pretty inefficient. use while loop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614325
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
+  if (index < numValues) {
--- End diff --

add comments for each conditional branch to explain this. 
in retrospect, probably I should have done that too :) 
but now the code is an order of magnitude more complicated now, so requires 
more docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140608463
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
--- End diff --

dont use tuples.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140612077
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager(
   }
 }
 
+/** Get all the values for key and all indices, in a (value, index) 
tuple. */
+def getAllWithIndex(key: UnsafeRow, numValues: Long): 
Iterator[(UnsafeRow, Long)] = {
--- End diff --

We can probably convert getAll to this. Does not make sense to have both, 
especially both are equally efficient if you return 
Iterator[KeyWithIndexAndValue].

Also, I think the iterator() method can be removed. its effectively not 
being used (used only in StateManager.iterator, which is not being used really).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140607924
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
--- End diff --

Use UnsafeRowPair for return. Same reason as above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140611178
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager(
   }
 }
 
+/** Get all the values for key and all indices, in a (value, index) 
tuple. */
+def getAllWithIndex(key: UnsafeRow, numValues: Long): 
Iterator[(UnsafeRow, Long)] = {
--- End diff --

use `KeyWithIndexAndValue` for returning.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140612202
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
--- End diff --

its not really cleanup, maybe "update"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140605588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec(
   }
 }
 
-/** Remove old buffered state rows using watermarks for state keys and 
values */
-def removeOldState(): Unit = {
+/**
+ * Builds an iterator over old state key-value pairs, removing them 
lazily as they're produced.
+ *
+ * This iterator is dangerous! It must be consumed fully before any 
other operations are made
+ * against this joiner's join state manager, and in particular commits 
must not happen while
+ * this iterator is ongoing. The intermediate states of the iterator 
leave the state manager in
+ * an invalid configuration.
+ *
+ * We do this unsafe thing to avoid requiring either two passes or 
full materialization when
+ * processing the rows for outer join.
+ */
+def removeOldState(): Iterator[(UnsafeRow, UnsafeRow)] = {
--- End diff --

Use `UnsafeRowPair` instead of Tuple2 (i.e. () is shorthand for 
scala.Tuple2). It reuses the tuple (or equivalent object. Avoid creation of a 
lot of short term objects, thus reducing GC pressure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614816
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
+  if (index < numValues) {
+val current = keyWithIndexToValue.get(currentKey, index)
+if (condition(current)) {
+  valueForIndex = current
+} else {
+  index += 1
+}
+  } else {
+cleanupCurrentKey()
+
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+numValues = currentKeyToNumValue.get.numValue
--- End diff --

numValues -> currentKeyNumValues


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140617841
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec(
 }
 
 // Filter the joined rows based on the given condition.
-val outputFilterFunction =
-  newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output).eval _
-val filteredOutputIter =
-  (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map 
{ row =>
-numOutputRows += 1
-row
-  }
+val outputFilterFunction = 
newPredicate(condition.getOrElse(Literal(true)), output).eval _
+
+val filteredInnerOutputIter = (leftOutputIter ++ 
rightOutputIter).filter(outputFilterFunction)
+
+val outputIter: Iterator[InternalRow] = joinType match {
+  case Inner =>
+filteredInnerOutputIter
+  case LeftOuter =>
+val nullRight = new 
GenericInternalRow(right.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  leftSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
rightSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(value).withRight(nullRight) }
+  case RightOuter =>
+val nullLeft = new 
GenericInternalRow(left.output.map(_.withNullability(true)).length)
+filteredInnerOutputIter ++
+  rightSideJoiner
+.removeOldState()
+.filterNot { case (key, value) => 
leftSideJoiner.containsKey(key) }
+.map { case (key, value) => 
joinedRow.withLeft(nullLeft).withRight(value) }
+  case _ => throw badJoinTypeException
+}
+
+val outputIterWithMetrics = outputIter.map { row =>
+  numOutputRows += 1
+  row
+}
+
+// Iterator which must be consumed after output completion before 
committing.
+val cleanupIter = joinType match {
+  case Inner =>
+leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
+  case LeftOuter => rightSideJoiner.removeOldState()
+  case RightOuter => leftSideJoiner.removeOldState()
+  case _ => throw badJoinTypeException
+}
--- End diff --

This confused me a lot but then I got why you need call removeOldState once 
again. Can you add the explanation that you have to clean the side that has not 
been cleaned.

Also, this can be moved into the "onOutputCompletion" because that is where 
this is needed, not before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140606013
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
--- End diff --

Dont use Options/Some inside tight loops performance-sensitive loops. 
Creates too many garbage objects very fast. Its fine to use null here for 
performance reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140615574
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala 
---
@@ -116,6 +116,26 @@ class StreamingJoinSuite extends StreamTest with 
StateStoreMetricsTest with Befo
 )
   }
 
+  private def setupJoin(joinType: String = "inner") = {
--- End diff --

not used anywhere.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140608032
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
--- End diff --

UnsafePairRow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614563
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
-  def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
+  def removeByValueCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
 
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  val key = keyToNumValue.key
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
 
-  var numValues: Long = keyToNumValue.numValue
+  private def currentKey = currentKeyToNumValue.get.key
+
+  var numValues: Long = 0L
   var index: Long = 0L
   var valueRemoved: Boolean = false
   var valueForIndex: UnsafeRow = null
 
-  while (index < numValues) {
-if (valueForIndex == null) {
-  valueForIndex = keyWithIndexToValue.get(key, index)
+  private def cleanupCurrentKey(): Unit = {
+if (valueRemoved) {
+  if (numValues >= 1) {
+keyToNumValues.put(currentKey, numValues)
+  } else {
+keyToNumValues.remove(currentKey)
+  }
+}
+
+numValues = 0
+index = 0
+valueRemoved = false
+valueForIndex = null
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+// TODO: there has to be a better way to express this but I don't 
know what it is
+while (valueForIndex == null && (index < numValues || 
allKeyToNumValues.hasNext)) {
+  if (index < numValues) {
+val current = keyWithIndexToValue.get(currentKey, index)
--- End diff --

current -> nextValue, to avoid confusion with valueWithIndex (assuming that 
gets renamed the currentValue)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19327#discussion_r140614721
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager(
   /**
* Remove using a predicate on keys. See class docs for more context and 
implement details.
*/
-  def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = {
-val allKeyToNumValues = keyToNumValues.iterator
-
-while (allKeyToNumValues.hasNext) {
-  val keyToNumValue = allKeyToNumValues.next
-  if (condition(keyToNumValue.key)) {
-keyToNumValues.remove(keyToNumValue.key)
-keyWithIndexToValue.removeAllValues(keyToNumValue.key, 
keyToNumValue.numValue)
+  def removeByKeyCondition(condition: UnsafeRow => Boolean): 
Iterator[(UnsafeRow, UnsafeRow)] = {
+new NextIterator[(UnsafeRow, UnsafeRow)] {
+
+  private val allKeyToNumValues = keyToNumValues.iterator
+
+  private var currentKeyToNumValue: Option[KeyAndNumValues] = None
+  private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None
+
+  private def currentKey = currentKeyToNumValue.get.key
+
+  private def getAndRemoveValue() = {
+val (current, index) = currentValues.get.next()
+keyWithIndexToValue.remove(currentKey, index)
+(currentKey, current)
+  }
+
+  override def getNext(): (UnsafeRow, UnsafeRow) = {
+if (currentValues.nonEmpty && currentValues.get.hasNext) {
+  return getAndRemoveValue()
+} else {
+  while (allKeyToNumValues.hasNext) {
+currentKeyToNumValue = Some(allKeyToNumValues.next())
+if (condition(currentKey)) {
+  currentValues = Some(keyWithIndexToValue.getAllWithIndex(
+currentKey, currentKeyToNumValue.get.numValue))
+  keyToNumValues.remove(currentKey)
+
+  if (currentValues.nonEmpty && currentValues.get.hasNext) {
+return getAndRemoveValue()
+  }
+}
+  }
+}
+
+finished = true
+null
   }
+
+  override def close: Unit = {}
 }
   }
 
   /**
* Remove using a predicate on values. See class docs for more context 
and implementation details.
*/
--- End diff --

Add high level docs on how this works. 
I think I should have add this in my PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.

2017-09-22 Thread joseph-torres
GitHub user joseph-torres opened a pull request:

https://github.com/apache/spark/pull/19327

[WIP] Implement stream-stream outer joins.

## What changes were proposed in this pull request?

Allow one-sided outer joins between two streams when a watermark is defined.

## How was this patch tested?

new unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joseph-torres/spark outerjoin

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19327.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19327


commit 1dfb95d5b7dee86b45aa831278d3fa7a7dc1917f
Author: Jose Torres 
Date:   2017-09-22T20:36:50Z

Implement stream-stream outer joins.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org