[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140968754 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey,
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140936872 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { --- End diff -- Yeah sorry, I meant to go back and fully address this comment the last time but I forgot.
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140935586 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) --- End diff -- I don't think it's correct to filter the output from remove. The query Seq(1, 2, 3).toDF("val1").join(Seq[Int]().toDF("val2"), 'val1 === 'val2 && 'val1 === 0, "left_outer") produces ((1, null), (2, null), (3, null)). Outer joins with watermark range conditions also wouldn't work if we filtered remove output, since the range condition would exclude null values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140933568 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -233,16 +234,46 @@ object UnsupportedOperationChecker { throwError("Full outer joins with streaming DataFrames/Datasets are not supported") } -case LeftOuter | LeftSemi | LeftAnti => +case LeftSemi | LeftAnti => if (right.isStreaming) { -throwError("Left outer/semi/anti joins with a streaming DataFrame/Dataset " + -"on the right is not supported") +throwError("Left semi/anti joins with a streaming DataFrame/Dataset " + +"on the right are not supported") } -case RightOuter => - if (left.isStreaming) { -throwError("Right outer join with a streaming DataFrame/Dataset on the left is " + -"not supported") +// We support left and right outer streaming joins only in the stream+stream case. +case LeftOuter | RightOuter => + if (joinType == LeftOuter && !left.isStreaming && right.isStreaming) { +throwError("Left outer join with a streaming DataFrame/Dataset " + + "on the right and non-streaming on the left is not supported") + } + if (joinType == RightOuter && left.isStreaming && !right.isStreaming) { +throwError("Right outer join with a streaming DataFrame/Dataset on the left and " + +"non-streaming on the right not supported") + } + + if (left.isStreaming && right.isStreaming) { +val watermarkInJoinKeys = subPlan match { + case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _) => +val keySet = AttributeSet(leftKeys ++ rightKeys) +(leftKeys ++ rightKeys).exists { + case a: AttributeReference => a.metadata.contains(EventTimeWatermark.delayKey) + case _ => false +} + case _ => false +} + +val oppositeSideHasWatermark = joinType match { + case LeftOuter => + right.output.find(_.metadata.contains(EventTimeWatermark.delayKey)).isDefined + case RightOuter => --- End diff -- This condition is not sufficient. state can be dropped from left side (in left outer join) only when both conditions are satisfied 1. right.output has watermark 2. join condition has a time range condition such that left side state value watermark can be defined from the right side event time watermark. Now `StreamingSymmetricHashJoinHelper.getOneSideStateWatermarkPredicate` checks for (2). So you have to invoke that to find out whether the conditions are sufficient for defining state value watermarks However, that is in the SQL project, not catalyst. So you may have to move that helper class into catalyst, and then use it from the UnsupportedOperationChecker (also have to rename that helper class accordingly what makes sense) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user joseph-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140933095 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def output: Seq[Attribute] = left.output ++ right.output + override def output: Seq[Attribute] = joinType match { +case _: InnerLike => left.output ++ right.output +case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => left.output.map(_.withNullability(true)) ++ right.output +case _ => + throwBadJoinTypeException() + Seq() + } override def outputPartitioning: Partitioning = joinType match { case _: InnerLike => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) +case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning)) --- End diff -- That's what I thought at first, but the non-streaming HashJoin seems to do the partitioning this way. (Or am I misunderstanding what buildSide means in that trait?) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140931385 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala --- @@ -233,16 +234,46 @@ object UnsupportedOperationChecker { throwError("Full outer joins with streaming DataFrames/Datasets are not supported") } -case LeftOuter | LeftSemi | LeftAnti => +case LeftSemi | LeftAnti => if (right.isStreaming) { -throwError("Left outer/semi/anti joins with a streaming DataFrame/Dataset " + -"on the right is not supported") +throwError("Left semi/anti joins with a streaming DataFrame/Dataset " + +"on the right are not supported") } -case RightOuter => - if (left.isStreaming) { -throwError("Right outer join with a streaming DataFrame/Dataset on the left is " + -"not supported") +// We support left and right outer streaming joins only in the stream+stream case. --- End diff -- not really. leftOuterJoin(stream, batch) is allowed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140914684 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -309,19 +396,24 @@ class SymmetricHashJoinStateManager( stateStore.get(keyWithIndexRow(key, valueIndex)) } -/** Get all the values for key and all indices. */ -def getAll(key: UnsafeRow, numValues: Long): Iterator[UnsafeRow] = { +/** + * Get all values and indices for the provided key. + * Should not return null. + */ +def getAll(key: UnsafeRow, numValues: Long): Iterator[KeyWithIndexAndValue] = { + val keyWithIndexAndValue = new KeyWithIndexAndValue() var index = 0 - new NextIterator[UnsafeRow] { -override protected def getNext(): UnsafeRow = { + new NextIterator[KeyWithIndexAndValue] { +override protected def getNext(): KeyWithIndexAndValue = { if (index >= numValues) { finished = true null } else { val keyWithIndex = keyWithIndexRow(key, index) val value = stateStore.get(keyWithIndex) index += 1 -value +// return original index +keyWithIndexAndValue.withNew(key, index - 1, value) --- End diff -- super nit: better to make it ``` keyWithIndexAndValue.set(key, index, value) index +=1 keyWithIndexAndValue ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140912351 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { --- End diff -- can you rename condition to "removalCondition" to make it clear its a condition for "what to remove", and not "what to keep" for the other method as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140906369 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -324,17 +389,33 @@ case class StreamingSymmetricHashJoinExec( } } -/** Remove old buffered state rows using watermarks for state keys and values */ -def removeOldState(): Unit = { +/** + * Builds an iterator over old state key-value pairs, removing them lazily as they're produced. + * + * @note This iterator must be consumed fully before any other operations are made + * against this joiner's join state manager. For efficiency reasons, the intermediate states of + * the iterator leave the state manager in an invalid configuration. --- End diff -- invalid configuration -> undefined state. it may or may not be valid. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140927997 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala --- @@ -416,32 +421,32 @@ class UnsupportedOperationsSuite extends SparkFunSuite { // Left outer joins: *-stream not allowed --- End diff -- need to test cases where it should not be allowed in outer joins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140905683 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +// We generate the outer join input by: +// * Getting an iterator over the rows that have aged out on the left side. These rows are +// candidates for being null joined. Note that to avoid doing two passes, this iterator +// removes the rows from the state manager as they're processed. +// * Checking whether the current row matches a key in the right side state. If it doesn't, +// we know we can join with null, since there was never (including this batch) a match +// within the watermark period. If it does, there must have been a match at some point, so +// we know we can't join with null. +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +val removedRowIter = leftSideJoiner.removeOldState() +val outerOutputIter = removedRowIter + .filterNot(pair => rightSideJoiner.containsKey(pair.key)) + .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) + +filteredInnerOutputIter ++ outerOutputIter + case RightOuter => +// See comments for left outer case. +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +val removedRowIter = rightSideJoiner.removeOldState() +val outerOutputIter = removedRowIter + .filterNot(pair => leftSideJoiner.containsKey(pair.key)) + .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value)) + +filteredInnerOutputIter ++ outerOutputIter + case _ => +throwBadJoinTypeException() +Iterator() +} + +val outputIterWithMetrics = outputIter.map { row => + numOutputRows += 1 + row +} // Function to remove old state after all the input has been consumed and output generated def onOutputCompletion = { allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - updateStartTimeNs), 0) - // Remove old state if needed + // TODO: how to get this for removals as part of outer join? allRemovalsTimeMs += timeTakenMs { -leftSideJoiner.removeOldState() -rightSideJoiner.removeOldState() +// Iterator which must be consumed after output completion before committing. +// For outer joins, we've removed old state from the appropriate side inline while we +// produced the null rows. So we need to finish cleaning the other side. For inner joins --- End diff -- "appropriate side inline" does not make sense to me. Something like this would be better "For inner joins, we have to remove unnecessary state rows from both sides if possible. For outer joins, we have already removed unnecessary state rows from the outer side (e.g., left side for left outer join) while generating the outer "null" outputs. Now, we have to remove unnecessary state rows from the other side (e.g., right side for the left outer join) if possible. In all cases, nothing needs to be outputted, hence the removal needs to be done greedily by immediately consuming the returned iterator." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140910541 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() --- End diff -- if you are using return, then no point doing else. best to consistently stick to one, either use if-return (no else) or if-else (no return). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140903737 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +// We generate the outer join input by: +// * Getting an iterator over the rows that have aged out on the left side. These rows are +// candidates for being null joined. Note that to avoid doing two passes, this iterator +// removes the rows from the state manager as they're processed. +// * Checking whether the current row matches a key in the right side state. If it doesn't, +// we know we can join with null, since there was never (including this batch) a match +// within the watermark period. If it does, there must have been a match at some point, so +// we know we can't join with null. +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +val removedRowIter = leftSideJoiner.removeOldState() +val outerOutputIter = removedRowIter + .filterNot(pair => rightSideJoiner.containsKey(pair.key)) + .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) + +filteredInnerOutputIter ++ outerOutputIter + case RightOuter => +// See comments for left outer case. +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +val removedRowIter = rightSideJoiner.removeOldState() +val outerOutputIter = removedRowIter + .filterNot(pair => leftSideJoiner.containsKey(pair.key)) + .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value)) + +filteredInnerOutputIter ++ outerOutputIter + case _ => +throwBadJoinTypeException() +Iterator() --- End diff -- this can be removed. see above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140900883 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def output: Seq[Attribute] = left.output ++ right.output + override def output: Seq[Attribute] = joinType match { +case _: InnerLike => left.output ++ right.output +case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => left.output.map(_.withNullability(true)) ++ right.output +case _ => + throwBadJoinTypeException() + Seq() + } override def outputPartitioning: Partitioning = joinType match { case _: InnerLike => PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) +case LeftOuter => PartitioningCollection(Seq(right.outputPartitioning)) --- End diff -- shouldnt this be `left.outputPartitioning` for LeftOuter? The the attributes from the right side can become null, hence the right side partitioning will not be preserved. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140910588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { --- End diff -- Add comments on what this while look is doing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140911830 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues)
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140913768 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues)
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140911442 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues)
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140904307 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +// We generate the outer join input by: +// * Getting an iterator over the rows that have aged out on the left side. These rows are +// candidates for being null joined. Note that to avoid doing two passes, this iterator +// removes the rows from the state manager as they're processed. +// * Checking whether the current row matches a key in the right side state. If it doesn't, +// we know we can join with null, since there was never (including this batch) a match +// within the watermark period. If it does, there must have been a match at some point, so +// we know we can't join with null. +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +val removedRowIter = leftSideJoiner.removeOldState() +val outerOutputIter = removedRowIter + .filterNot(pair => rightSideJoiner.containsKey(pair.key)) + .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight)) + +filteredInnerOutputIter ++ outerOutputIter + case RightOuter => +// See comments for left outer case. +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +val removedRowIter = rightSideJoiner.removeOldState() +val outerOutputIter = removedRowIter + .filterNot(pair => leftSideJoiner.containsKey(pair.key)) + .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value)) + +filteredInnerOutputIter ++ outerOutputIter + case _ => +throwBadJoinTypeException() +Iterator() +} + +val outputIterWithMetrics = outputIter.map { row => + numOutputRows += 1 + row +} // Function to remove old state after all the input has been consumed and output generated def onOutputCompletion = { allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - updateStartTimeNs), 0) - // Remove old state if needed + // TODO: how to get this for removals as part of outer join? allRemovalsTimeMs += timeTakenMs { -leftSideJoiner.removeOldState() -rightSideJoiner.removeOldState() +// Iterator which must be consumed after output completion before committing. --- End diff -- nit: Rather than starting with how its implemented (i.e. referring to iterator), first explain what does this code do semanticall (e.g. Remove all the state rows that are not needed anymore). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140913094 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues)
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140928059 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -146,7 +146,14 @@ case class StreamingSymmetricHashJoinExec( stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, right) } - require(joinType == Inner, s"${getClass.getSimpleName} should not take $joinType as the JoinType") + private def throwBadJoinTypeException(): Unit = { --- End diff -- Unit -> Nothing. See comment below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140912029 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues)
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140911245 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -87,70 +87,157 @@ class SymmetricHashJoinStateManager( } /** - * Remove using a predicate on keys. See class docs for more context and implement details. + * Remove using a predicate on keys. + * + * This produces an iterator over the (key, value) pairs satisfying condition(key), where the + * underlying store is updated as a side-effect of producing next. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: KeyAndNumValues = null + private var currentValues: Iterator[KeyWithIndexAndValue] = null + + private def currentKey = currentKeyToNumValue.key + + private val reusedPair = new UnsafeRowPair() + + private def getAndRemoveValue() = { +val keyWithIndexAndValue = currentValues.next() +keyWithIndexToValue.remove(currentKey, keyWithIndexAndValue.valueIndex) +reusedPair.withRows(currentKey, keyWithIndexAndValue.value) + } + + override def getNext(): UnsafeRowPair = { +if (currentValues != null && currentValues.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = allKeyToNumValues.next() +if (condition(currentKey)) { + currentValues = keyWithIndexToValue.getAll( +currentKey, currentKeyToNumValue.numValue) + keyToNumValues.remove(currentKey) + + if (currentValues.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** - * Remove using a predicate on values. See class docs for more context and implementation details. + * Remove using a predicate on values. + * + * At a high level, this produces an iterator over the (key, value) pairs such that value + * satisfies the predicate, where producing an element removes the value from the state store + * and producing all elements with a given key updates it accordingly. + * + * This implies the iterator must be consumed fully without any other operations on this manager + * or the underlying store being interleaved. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[UnsafeRowPair] = { +new NextIterator[UnsafeRowPair] { -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + // Reuse this object to avoid creation+GC overhead. + private val reusedPair = new UnsafeRowPair() - var numValues: Long = keyToNumValue.numValue - var index: Long = 0L - var valueRemoved: Boolean = false - var valueForIndex: UnsafeRow = null + private val allKeyToNumValues = keyToNumValues.iterator - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private var currentKey: UnsafeRow = null + private var numValues: Long = 0L + private var index: Long = 0L + private var valueRemoved: Boolean = false + + // Push the data for the current key to the numValues store, and reset the tracking variables + // to their empty state. + private def storeCurrentKey(): Unit = { --- End diff -- nit: you are not really storing a new key, you are updating the value of an existing key, or
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140900266 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -157,11 +164,20 @@ case class StreamingSymmetricHashJoinExec( override def requiredChildDistribution: Seq[Distribution] = ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil - override def output: Seq[Attribute] = left.output ++ right.output + override def output: Seq[Attribute] = joinType match { +case _: InnerLike => left.output ++ right.output +case LeftOuter => left.output ++ right.output.map(_.withNullability(true)) +case RightOuter => left.output.map(_.withNullability(true)) ++ right.output +case _ => + throwBadJoinTypeException() + Seq() --- End diff -- I think if you define the return type of throwBadJoinTypeException as Nothing, then this Seq() wont be needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140903616 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +232,70 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) --- End diff -- I had a comment on this earlier, the output from remove needs to be filtered as well. so this should be moved to later, on the (combined inner join + outer null from remove) iterator --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140910781 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -324,17 +389,33 @@ case class StreamingSymmetricHashJoinExec( } } -/** Remove old buffered state rows using watermarks for state keys and values */ -def removeOldState(): Unit = { +/** + * Builds an iterator over old state key-value pairs, removing them lazily as they're produced. + * + * @note This iterator must be consumed fully before any other operations are made + * against this joiner's join state manager. For efficiency reasons, the intermediate states of + * the iterator leave the state manager in an invalid configuration. + * + * We do this to avoid requiring either two passes or full materialization when + * processing the rows for outer join. + */ +def removeOldState(): Iterator[UnsafeRowPair] = { stateWatermarkPredicate match { case Some(JoinStateKeyWatermarkPredicate(expr)) => joinStateManager.removeByKeyCondition(stateKeyWatermarkPredicateFunc) case Some(JoinStateValueWatermarkPredicate(expr)) => joinStateManager.removeByValueCondition(stateValueWatermarkPredicateFunc) -case _ => +case _ => Iterator() --- End diff -- you can do Iterator.empty. more obvious --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140605142 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -146,7 +146,13 @@ case class StreamingSymmetricHashJoinExec( stateWatermarkPredicates = JoinStateWatermarkPredicates(), left, right) } - require(joinType == Inner, s"${getClass.getSimpleName} should not take $joinType as the JoinType") + private lazy val badJoinTypeException = --- End diff -- This should be a def. I dont think exceptions should be created and thrown later. I am not sure whether it will capture the stack trace of when it was created or when it was thrown. Either way its a bad pattern to have. A better approach is to make it into a function `def throwBadJoinTypeException() { throw new ... }` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140608418 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() --- End diff -- current -> currentValue --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140615004 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { --- End diff -- This loop can probably be split into two methods `findNextValueToRemove`, and `removeValue` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140616618 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + leftSideJoiner +.removeOldState() +.filterNot { case (key, value) => rightSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(value).withRight(nullRight) } + case RightOuter => +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + rightSideJoiner --- End diff -- nit: split this into two statements with an intermediate variable `removedRowsIter`. and docs on what this does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614146 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { --- End diff -- maybe rename`valueForIndex -> currentValue` and `index -> nextValueIndex ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140605854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec( } } -/** Remove old buffered state rows using watermarks for state keys and values */ -def removeOldState(): Unit = { +/** + * Builds an iterator over old state key-value pairs, removing them lazily as they're produced. + * + * This iterator is dangerous! It must be consumed fully before any other operations are made --- End diff -- Maybe tone down the language is a little :) Rather than saying "dangerous", uses `@note` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140617927 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + leftSideJoiner +.removeOldState() +.filterNot { case (key, value) => rightSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(value).withRight(nullRight) } + case RightOuter => +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + rightSideJoiner +.removeOldState() +.filterNot { case (key, value) => leftSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(nullLeft).withRight(value) } + case _ => throw badJoinTypeException +} + +val outputIterWithMetrics = outputIter.map { row => + numOutputRows += 1 + row +} + +// Iterator which must be consumed after output completion before committing. +val cleanupIter = joinType match { + case Inner => +leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState() + case LeftOuter => rightSideJoiner.removeOldState() + case RightOuter => leftSideJoiner.removeOldState() + case _ => throw badJoinTypeException +} // Function to remove old state after all the input has been consumed and output generated def onOutputCompletion = { allUpdatesTimeMs += math.max(NANOSECONDS.toMillis(System.nanoTime - updateStartTimeNs), 0) - // Remove old state if needed + // TODO: how to get this for removals as part of outer join? allRemovalsTimeMs += timeTakenMs { -leftSideJoiner.removeOldState() -rightSideJoiner.removeOldState() +cleanupIter.foreach(_ => ()) --- End diff -- dont use foreach. scala's foreach is pretty inefficient. use while loop. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614325 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { + if (index < numValues) { --- End diff -- add comments for each conditional branch to explain this. in retrospect, probably I should have done that too :) but now the code is an order of magnitude more complicated now, so requires more docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140608463 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) --- End diff -- dont use tuples. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140612077 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager( } } +/** Get all the values for key and all indices, in a (value, index) tuple. */ +def getAllWithIndex(key: UnsafeRow, numValues: Long): Iterator[(UnsafeRow, Long)] = { --- End diff -- We can probably convert getAll to this. Does not make sense to have both, especially both are equally efficient if you return Iterator[KeyWithIndexAndValue]. Also, I think the iterator() method can be removed. its effectively not being used (used only in StateManager.iterator, which is not being used really). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140607924 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { --- End diff -- Use UnsafeRowPair for return. Same reason as above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140611178 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -329,6 +392,27 @@ class SymmetricHashJoinStateManager( } } +/** Get all the values for key and all indices, in a (value, index) tuple. */ +def getAllWithIndex(key: UnsafeRow, numValues: Long): Iterator[(UnsafeRow, Long)] = { --- End diff -- use `KeyWithIndexAndValue` for returning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140612202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { --- End diff -- its not really cleanup, maybe "update"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140605588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -324,17 +367,34 @@ case class StreamingSymmetricHashJoinExec( } } -/** Remove old buffered state rows using watermarks for state keys and values */ -def removeOldState(): Unit = { +/** + * Builds an iterator over old state key-value pairs, removing them lazily as they're produced. + * + * This iterator is dangerous! It must be consumed fully before any other operations are made + * against this joiner's join state manager, and in particular commits must not happen while + * this iterator is ongoing. The intermediate states of the iterator leave the state manager in + * an invalid configuration. + * + * We do this unsafe thing to avoid requiring either two passes or full materialization when + * processing the rows for outer join. + */ +def removeOldState(): Iterator[(UnsafeRow, UnsafeRow)] = { --- End diff -- Use `UnsafeRowPair` instead of Tuple2 (i.e. () is shorthand for scala.Tuple2). It reuses the tuple (or equivalent object. Avoid creation of a lot of short term objects, thus reducing GC pressure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614816 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { + if (index < numValues) { +val current = keyWithIndexToValue.get(currentKey, index) +if (condition(current)) { + valueForIndex = current +} else { + index += 1 +} + } else { +cleanupCurrentKey() + +currentKeyToNumValue = Some(allKeyToNumValues.next()) +numValues = currentKeyToNumValue.get.numValue --- End diff -- numValues -> currentKeyNumValues --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140617841 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -216,22 +229,51 @@ case class StreamingSymmetricHashJoinExec( } // Filter the joined rows based on the given condition. -val outputFilterFunction = - newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output).eval _ -val filteredOutputIter = - (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction).map { row => -numOutputRows += 1 -row - } +val outputFilterFunction = newPredicate(condition.getOrElse(Literal(true)), output).eval _ + +val filteredInnerOutputIter = (leftOutputIter ++ rightOutputIter).filter(outputFilterFunction) + +val outputIter: Iterator[InternalRow] = joinType match { + case Inner => +filteredInnerOutputIter + case LeftOuter => +val nullRight = new GenericInternalRow(right.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + leftSideJoiner +.removeOldState() +.filterNot { case (key, value) => rightSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(value).withRight(nullRight) } + case RightOuter => +val nullLeft = new GenericInternalRow(left.output.map(_.withNullability(true)).length) +filteredInnerOutputIter ++ + rightSideJoiner +.removeOldState() +.filterNot { case (key, value) => leftSideJoiner.containsKey(key) } +.map { case (key, value) => joinedRow.withLeft(nullLeft).withRight(value) } + case _ => throw badJoinTypeException +} + +val outputIterWithMetrics = outputIter.map { row => + numOutputRows += 1 + row +} + +// Iterator which must be consumed after output completion before committing. +val cleanupIter = joinType match { + case Inner => +leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState() + case LeftOuter => rightSideJoiner.removeOldState() + case RightOuter => leftSideJoiner.removeOldState() + case _ => throw badJoinTypeException +} --- End diff -- This confused me a lot but then I got why you need call removeOldState once again. Can you add the explanation that you have to clean the side that has not been cleaned. Also, this can be moved into the "onOutputCompletion" because that is where this is needed, not before. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140606013 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None --- End diff -- Dont use Options/Some inside tight loops performance-sensitive loops. Creates too many garbage objects very fast. Its fine to use null here for performance reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140615574 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala --- @@ -116,6 +116,26 @@ class StreamingJoinSuite extends StreamTest with StateStoreMetricsTest with Befo ) } + private def setupJoin(joinType: String = "inner") = { --- End diff -- not used anywhere. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140608032 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { --- End diff -- UnsafePairRow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614563 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ - def removeByValueCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator + def removeByValueCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - val key = keyToNumValue.key + private var currentKeyToNumValue: Option[KeyAndNumValues] = None - var numValues: Long = keyToNumValue.numValue + private def currentKey = currentKeyToNumValue.get.key + + var numValues: Long = 0L var index: Long = 0L var valueRemoved: Boolean = false var valueForIndex: UnsafeRow = null - while (index < numValues) { -if (valueForIndex == null) { - valueForIndex = keyWithIndexToValue.get(key, index) + private def cleanupCurrentKey(): Unit = { +if (valueRemoved) { + if (numValues >= 1) { +keyToNumValues.put(currentKey, numValues) + } else { +keyToNumValues.remove(currentKey) + } +} + +numValues = 0 +index = 0 +valueRemoved = false +valueForIndex = null + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +// TODO: there has to be a better way to express this but I don't know what it is +while (valueForIndex == null && (index < numValues || allKeyToNumValues.hasNext)) { + if (index < numValues) { +val current = keyWithIndexToValue.get(currentKey, index) --- End diff -- current -> nextValue, to avoid confusion with valueWithIndex (assuming that gets renamed the currentValue) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19327#discussion_r140614721 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala --- @@ -89,61 +89,124 @@ class SymmetricHashJoinStateManager( /** * Remove using a predicate on keys. See class docs for more context and implement details. */ - def removeByKeyCondition(condition: UnsafeRow => Boolean): Unit = { -val allKeyToNumValues = keyToNumValues.iterator - -while (allKeyToNumValues.hasNext) { - val keyToNumValue = allKeyToNumValues.next - if (condition(keyToNumValue.key)) { -keyToNumValues.remove(keyToNumValue.key) -keyWithIndexToValue.removeAllValues(keyToNumValue.key, keyToNumValue.numValue) + def removeByKeyCondition(condition: UnsafeRow => Boolean): Iterator[(UnsafeRow, UnsafeRow)] = { +new NextIterator[(UnsafeRow, UnsafeRow)] { + + private val allKeyToNumValues = keyToNumValues.iterator + + private var currentKeyToNumValue: Option[KeyAndNumValues] = None + private var currentValues: Option[Iterator[(UnsafeRow, Long)]] = None + + private def currentKey = currentKeyToNumValue.get.key + + private def getAndRemoveValue() = { +val (current, index) = currentValues.get.next() +keyWithIndexToValue.remove(currentKey, index) +(currentKey, current) + } + + override def getNext(): (UnsafeRow, UnsafeRow) = { +if (currentValues.nonEmpty && currentValues.get.hasNext) { + return getAndRemoveValue() +} else { + while (allKeyToNumValues.hasNext) { +currentKeyToNumValue = Some(allKeyToNumValues.next()) +if (condition(currentKey)) { + currentValues = Some(keyWithIndexToValue.getAllWithIndex( +currentKey, currentKeyToNumValue.get.numValue)) + keyToNumValues.remove(currentKey) + + if (currentValues.nonEmpty && currentValues.get.hasNext) { +return getAndRemoveValue() + } +} + } +} + +finished = true +null } + + override def close: Unit = {} } } /** * Remove using a predicate on values. See class docs for more context and implementation details. */ --- End diff -- Add high level docs on how this works. I think I should have add this in my PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19327: [WIP] Implement stream-stream outer joins.
GitHub user joseph-torres opened a pull request: https://github.com/apache/spark/pull/19327 [WIP] Implement stream-stream outer joins. ## What changes were proposed in this pull request? Allow one-sided outer joins between two streams when a watermark is defined. ## How was this patch tested? new unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/joseph-torres/spark outerjoin Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19327.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19327 commit 1dfb95d5b7dee86b45aa831278d3fa7a7dc1917f Author: Jose TorresDate: 2017-09-22T20:36:50Z Implement stream-stream outer joins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org