[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833470 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs --- End diff -- done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139833312 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { --- End diff -- removed the other one. not needed. copied the docs to this location. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/19269#discussion_r139832973 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java --- @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2.writer; + +import org.apache.spark.annotation.InterfaceStability; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SaveMode; +import org.apache.spark.sql.sources.v2.DataSourceV2Options; +import org.apache.spark.sql.sources.v2.WriteSupport; +import org.apache.spark.sql.types.StructType; + +/** + * A data source writer that is returned by + * {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}. + * It can mix in various writing optimization interfaces to speed up the data saving. The actual + * writing logic is delegated to {@link WriteTask} that is returned by {@link #createWriteTask()}. + * + * The writing procedure is: + * 1. Create a write task by {@link #createWriteTask()}, serialize and send it to all the + * partitions of the input data(RDD). + * 2. For each partition, create a data writer with the write task, and write the data of the + * partition with this writer. If all the data are written successfully, call + * {@link DataWriter#commit()}. If exception happens during the writing, call + * {@link DataWriter#abort()}. This step may repeat several times as Spark will retry failed + * tasks. + * 3. Wait until all the writers/partitions are finished, i.e., either committed or aborted. If + * all partitions are written successfully, call {@link #commit(WriterCommitMessage[])}. If + * some partitions failed and aborted, call {@link #abort()}. --- End diff -- The main reason why I wanted a separate SPIP for the write path was this point in the doc: > Ideally partitioning/bucketing concept should not be exposed in the Data Source API V2, because they are just techniques for data skipping and pre-partitioning. However, these 2 concepts are already widely used in Spark, e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be consistent, we need to add partitioning/bucketing to Data Source V2, so that the implementations can be able to specify partitioning/bucketing for read/write. There's a lot in there that's worth thinking about and possibly changing: 1. Ideally, the DataSourceV2 API wouldn't support bucketing/partitioning 2. The current DataFrameWriter API is what we should continue to support 3. Implementations should supply bucketing and partitioning for writes because of 2 **Bucketing/partitioning**: It comes down to the level at which this API is going to be used. It looks like this API currently ignores bucketing and partitioning (unless my read through was too quick). I think I agree that in the long term that's a good thing, but we need ways for a data source to tell Spark about its requirements for incoming data. In the current version, it looks like Spark would know how to prepare data for writers outside of this API (rather than including support as suggested by point 3). When writing a partitioned table, Spark would get the partitioning from the table definition in the metastore and automatically sort by partition columns. Is that right? I'd like to move the data store's requirements behind this API. For example, writing to HBase files directly requires sorting by key first. We don't want to do the sort in the writer because it may duplicate work (and isn't captured in the physical plan), and we also don't want to require Spark to know about the requirements of the HBase data store, or any other specific implementation. **DataFrameWriter API**: I'd like to talk about separating the API for table definitions and writes, but not necessarily as part of this work. The SPIP should clearly state whether that's part of th
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832926 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832660 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832320 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832253 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832098 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139832036 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831879 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831615 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831226 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831209 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139831092 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830996 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830591 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we --- End diff -- done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions a
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139830482 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => +j.copy( + stateInfo = Some(nextStatefulOperationStateInfo), --- End diff -- Whatever optimization takes place, the same optimizations will occur in EVERY batch. So if aggregation is pushed below join, then all the batches will have that. What we have to guard against is cost-based optimization that can reorder things differently in different batches. That is, why I have disabled cost-based join optimization. And when adding such optimizations in the future, we have to be cautious for the streaming case and disable them. Also, this is a general concern with other stateful ops as well, not something that this PR would address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19170 **[Test build #81951 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81951/testReport)** for PR 19170 at commit [`04c1e2a`](https://github.com/apache/spark/commit/04c1e2aa24c61f13f1df5148416bb00f0649fcaf). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19170 cc @vanzin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...
Github user zsxwing commented on the issue: https://github.com/apache/spark/pull/19170 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81945/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18659 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18659 **[Test build #81945 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81945/testReport)** for PR 18659 at commit [`69112a5`](https://github.com/apache/spark/commit/69112a5a771bc3c98a7cd0b21ffda883d86c41a4). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `// enable memo iff we serialize the row with schema (schema and class should be memorized)` * `abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #13477: [SPARK-15739][GraphX] Expose aggregateMessagesWithActive...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/13477 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139819360 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true) + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 2) + +// make 5 offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host2" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 3 cores per executor) --- End diff -- update comments 5 offers -> 6 offers twice -> three times --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139819793 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(taskOption4.get.addedJars === addedJarsMidTaskSet) } + test("limit max concurrent running tasks in a job group when configured ") { +val conf = new SparkConf(). + set(config.BLACKLIST_ENABLED, true). + set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max concurrent tasks to 2 + +sc = new SparkContext("local", "test", conf) +sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) +val props = new Properties(); +props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // set the job group + +val tasks = Array.tabulate[Task[_]](10) { i => + new FakeTask(0, i, Nil) +} +val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, props), 2) + +// make some offers to our taskset +var taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + // offer each executor twice (simulating 2 cores per executor) + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up to maxConcurrentTasks. + +// make 4 more offers +val taskDescs2 = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs2.size === 0) // tsm doesn't accept any as it is already running at max tasks + +// inform tsm that one task has completed +val directTaskResult = createTaskResult(0) +tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult) + +// make 4 more offers after previous task completed +taskDescs = Seq( + "exec1" -> "host1", + "exec2" -> "host1" +).flatMap { case (exec, host) => + (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} +} +assert(taskDescs.size === 1) // tsm accepts one as it can run one more task + } + + test("do not limit max concurrent running tasks in a job group by default") { --- End diff -- I don't think this test really adds anything beyond other tests in this suite. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139821899 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager( // place the executors. private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, (Int, Map[String, Int])] +override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + jobStart.stageInfos.foreach(stageInfo => stageIdToJobId(stageInfo.stageId) = jobStart.jobId) + + var jobGroupId = if (jobStart.properties != null) { +jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + } else { +null + } + + val maxConTasks = if (jobGroupId != null && +conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt + } else { +Int.MaxValue + } + + if (maxConTasks <= 0) { +throw new IllegalArgumentException( + "Maximum Concurrent Tasks should be set greater than 0 for the job to progress.") + } + + if (jobGroupId == null || !conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) { +jobGroupId = DEFAULT_JOB_GROUP + } + + jobIdToJobGroup(jobStart.jobId) = jobGroupId + if (!jobGroupToMaxConTasks.contains(jobGroupId)) { --- End diff -- this is probably a weird / unusual situation, but is this really the behavior you want if there are multiple jobs submitted for the same job group? Wouldn't you just take the conf for the job group at the time each job was submitted? Worst case with this approach: say you are *always* submitting multiple jobs for each job group; when one finishes, you immediately start another one, so that the new one partially overlaps the old one. Then even if you change the conf, all jobs will keep using the old value forever. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19194#discussion_r139818031 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumSpeculativeTasks(stageId) = stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1 +maxConcurrentTasks = getMaxConTasks +logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on spec. task submitted.") allocationManager.onSchedulerBacklogged() } } /** + * Calculate the maximum no. of concurrent tasks that can run currently. + */ +def getMaxConTasks(): Int = { + // We can limit the no. of concurrent tasks by a job group. A job group can have multiple jobs + // with multiple stages. We need to get all the active stages belonging to a job group to + // calculate the total no. of pending + running tasks to decide the maximum no. of executors + // we need at that time to serve the outstanding tasks. This is capped by the minimum no. of + // outstanding tasks and the max concurrent limit specified for the job group if any. + + def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = { +totalPendingTasks(stageId) + totalRunningTasks(stageId) + } + + def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = (totalTasks, stageToNumTasks) => { +val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, stageToNumTasks._2) +sumOrMax(totalTasks, activeTasks) + } + // Get the total running & pending tasks for all stages in a job group. + def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, Int]): Int = { +stagesItr.foldLeft(0)(sumIncompleteTasksForStages) + } + + def sumIncompleteTasksForJobGroup: (Int, (String, mutable.HashMap[Int, Int])) => Int = { +(maxConTasks, x) => { + val totalIncompleteTasksForJobGroup = getIncompleteTasksForJobGroup(x._2) + val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), totalIncompleteTasksForJobGroup) + sumOrMax(maxConTasks, maxTasks) +} + } + + def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) Int.MaxValue else (a + b) + + def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a) + + val stagesByJobGroup = stageIdToNumTasks.groupBy(x => jobIdToJobGroup(stageIdToJobId(x._1))) --- End diff -- you could just store `stageIdToJobGroupId`. Simplifies this a bit, and then you dont' need to store `jobIdToJobGroup` at all, I think --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r139821887 --- Diff: docs/sql-programming-guide.md --- @@ -968,6 +968,13 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession + + spark.sql.autoTypeCastingCompatibility + false --- End diff -- `hive, default` hive, default --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81950 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81950/testReport)** for PR 19196 at commit [`8a6eafe`](https://github.com/apache/spark/commit/8a6eafef056b2a64ee0be07ce886ad69dc295537). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19252 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/18853#discussion_r139821539 --- Diff: docs/sql-programming-guide.md --- @@ -968,6 +968,13 @@ Configuration of Parquet can be done using the `setConf` method on `SparkSession + + spark.sql.autoTypeCastingCompatibility --- End diff -- `spark.sql.typeCoercion.mode` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19252 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139820801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- how about moving this to `SortOrder` object : https://github.com/apache/spark/blob/e9c91badce64731ffd3e53cbcd9f044a7593e6b8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala#L92 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19252 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139816308 --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala --- @@ -399,14 +399,17 @@ private[ml] object DefaultParamsReader { * This works if all Params implement [[org.apache.spark.ml.param.Param.jsonDecode()]]. --- End diff -- Update the docstring to state that params included in `skipParams` aren't set. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139809836 --- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala --- @@ -399,14 +399,17 @@ private[ml] object DefaultParamsReader { * This works if all Params implement [[org.apache.spark.ml.param.Param.jsonDecode()]]. * TODO: Move to [[Metadata]] method */ - def getAndSetParams(instance: Params, metadata: Metadata): Unit = { + def getAndSetParams(instance: Params, metadata: Metadata, + skipParams: List[String] = null): Unit = { --- End diff -- Use an `Option[List[String]]` that defaults to `None` instead of a `List[String]` that defaults to null? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...
Github user smurching commented on a diff in the pull request: https://github.com/apache/spark/pull/19278#discussion_r139817121 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala --- @@ -17,6 +17,7 @@ package org.apache.spark.ml.tuning +import java.io.IOException --- End diff -- This exception is unused & can be removed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19250 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81943/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19250 **[Test build #81943 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81943/testReport)** for PR 19250 at commit [`515b38b`](https://github.com/apache/spark/commit/515b38b7b0a1efc0de4a92479c1e5a479b872146). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815599 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139804436 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139811883 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139808731 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139803500 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala --- @@ -114,6 +115,16 @@ class IncrementalExecution( stateInfo = Some(nextStatefulOperationStateInfo), batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs), eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs)) + + case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, _, left, right) => +j.copy( + stateInfo = Some(nextStatefulOperationStateInfo), --- End diff -- So, I think this may not be robust. At each trigger, we create a new IncrementalExecution, so the `statefulOperatorId` we hope gets incremented in a deterministic manner. I can imagine adding things to the Optimizer in the future which may move an `EquiJoin` before an aggregation. In this case, the state store id's of the aggregation and join may switch. I'm not sure if we're protected against that somehow, so just wanted to bring it up. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139816392 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139804087 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we --- End diff -- nit: `with the new data` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139810069 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139806969 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139807021 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139816206 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812580 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs --- End diff -- specify which class docs --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139814640 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139813860 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139813588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139814268 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139806701 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815407 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139806782 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139809419 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815966 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812115 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139811702 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139809689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139815159 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { + leftKeys.zipWithIndex.collectFirst { +case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index + } orElse { +rightKeys.zipWithIndex.collectFirst { + case (ne: NamedExpression, index) if ne.metadata.contains(delayKey) => index +} + } +} + +def getOneSideStateWatermarkPredicate( +oneSideInputAttributes: Seq[Attribute], +oneSideJoinKeys: Seq[Expression], +otherSideInputAttributes: Seq[Attribute]): Option[JoinStateWatermarkPredicate] = { + val isWatermarkDefinedOnInput = oneSideInputAttributes.exists(_.metadata.contains(delayKey)) + val isWatermarkDefinedOnJoinKey = joinKeyOrdinalForWatermark.isDefined + + if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the class docs +val keyExprWithWatermark = BoundReference( + joinKeyOrdinalForWatermark.get, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType, + oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable) +val expr = watermarkExpression(Some(keyExprWithWatermark), eventTimeWatermark) +expr.map(JoinStateKeyWatermarkPredicate) + + } else if (isWatermarkDefinedOnInput) { // case 2 explained in the class docs +val stateValueWatermark = getStateValueWatermark( + attributesToFindStateWatemarkFor = oneSideInputAttributes, + attributesWithEventWatermark = otherSideInputAttributes, + condition, + eventTimeWatermark) +val inputAttributeWithWatermark = oneSideInputAttributes.find(_.metadata.contains(delayKey)) +val expr = watermarkExpression(inputAttributeWithWatermark, stateValueWatermark) +expr.map(JoinSt
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139807880 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala --- @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import java.util.concurrent.TimeUnit.NANOSECONDS + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan} +import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._ +import org.apache.spark.sql.execution.streaming.state._ +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.util.{CompletionIterator, SerializableConfiguration} + + +/** + * Performs stream-stream join using symmetric hash join algorithm. It works as follows. + * + * /---\ + * left side input ->|left side state|--\ + * \---/ | + *|> joined output + * /---\ | + * right side input >|right side state |--/ + * \---/ + * + * Each join side buffers past input rows as streaming state so that the past input can be joined + * with future input on the other side. This buffer state is effectively a multi-map: + *equi-join key -> list of past input rows received with the join key + * + * For each input row in each side, the following operations take place. + * - Calculate join key from the row. + * - Use the join key to append the row to the buffer state of the side that the row came from. + * - Find past buffered values for the key from the other side. For each such value, emit the + * "joined row" (left-row, right-row) + * - Apply the optional condition to filter the joined rows as the final output. + * + * If a timestamp column with event time watermark is present in the join keys or in the input + * data, then the it uses the watermark figure out which rows in the buffer will not join with + * and new data, and therefore can be discarded. Depending on the provided query conditions, we + * can define thresholds on both state key (i.e. joining keys) and state value (i.e. input rows). + * There are three kinds of queries possible regarding this as explained below. + * Assume that watermark has been defined on both `leftTime` and `rightTime` columns used below. + * + * 1. When timestamp/time-window + watermark is in the join keys. Example (pseudo-SQL): + * + * SELECT * FROM leftTable, rightTable + * ON + *leftKey = rightKey AND + *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 1hr tumbling windows + * + *In this case, this operator will join rows newer than watermark which fall in the same + *1 hour window. Say the event-time watermark is "12:34" (both left and right input). + *Then input rows can only have time > 12:34. Hence, they can only join with buffered rows + *where window >= 12:00 - 1:00 and all buffered rows with join window < 12:00 can be + *discarded. In other words, the operator will discard all state where + *window in state key (i.e. join key) < event time watermark. This threshold is called + *State Key Watermark. + * + * 2. When timestamp range conditions
[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...
Github user brkyvz commented on a diff in the pull request: https://github.com/apache/spark/pull/19271#discussion_r139812315 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming + +import scala.util.control.NonFatal + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, AttributeReference, BoundReference, Cast, CheckOverflow, Expression, ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, TimeSub, UnaryMinus} +import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._ +import org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + + +/** + * Helper object for [[StreamingSymmetricHashJoinExec]]. + */ +object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with Logging { + + sealed trait JoinSide + case object LeftSide extends JoinSide { override def toString(): String = "left" } + case object RightSide extends JoinSide { override def toString(): String = "right" } + + sealed trait JoinStateWatermarkPredicate + case class JoinStateKeyWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + case class JoinStateValueWatermarkPredicate(expr: Expression) extends JoinStateWatermarkPredicate + + case class JoinStateWatermarkPredicates( +left: Option[JoinStateWatermarkPredicate] = None, +right: Option[JoinStateWatermarkPredicate] = None) + + def getStateWatermarkPredicates( + leftAttributes: Seq[Attribute], + rightAttributes: Seq[Attribute], + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + condition: Option[Expression], + eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = { +val joinKeyOrdinalForWatermark: Option[Int] = { --- End diff -- I see this code is duplicated above. Maybe you can make it a function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139815684 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java --- @@ -52,4 +55,13 @@ public abstract void fetchBlocks( String[] blockIds, BlockFetchingListener listener, TempShuffleFileManager tempShuffleFileManager); + + /** + * Get the shuffle MetricsSet from ShuffleClient, this will be used used in MetricsSystem to --- End diff -- nit: `this will be used used` -> `this will be used` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139815983 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java --- @@ -117,6 +118,12 @@ public void fetchBlocks( } } + @Override + public MetricSet shuffleMetrics() { +checkInit(); --- End diff -- not related to the change here -- but should we also checkInit in the `close()` function? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19145: [spark-21933][yarn] Spark Streaming request more executo...
Github user squito commented on the issue: https://github.com/apache/spark/pull/19145 I'm not sure I totally follow the sequence of events, but I get the feeling this should be handled in yarn, not spark. Also, I agree with Jerry, it seems like your `completedContainerIdSet` may grow continuously. You'll remove from it *if* you happen to get a duplicate message. But I think in most cases you will not a get duplicate message, if I understand correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19280 **[Test build #81949 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81949/testReport)** for PR 19280 at commit [`20e3585`](https://github.com/apache/spark/commit/20e3585eac16ef3bfe403ec23f57a2705ff47ecb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19280 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81944/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19280 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19280 **[Test build #81944 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81944/testReport)** for PR 19280 at commit [`acbaf8b`](https://github.com/apache/spark/commit/acbaf8b65629344d760360b768e89f1712af8942). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19283: Update quickstart python dataset example
Github user srowen commented on the issue: https://github.com/apache/spark/pull/19283 Have a look at http://spark.apache.org/contributing.html -- maybe prefix the title with `[MINOR][DOCS]` for completeness? Are there other instances of this same issue in the Pyspark docs? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19252 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19252 **[Test build #81941 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81941/testReport)** for PR 19252 at commit [`63f9dc2`](https://github.com/apache/spark/commit/63f9dc23f781bdf6e8c891e1a502b91ccdbc0281). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19020#discussion_r139807816 --- Diff: mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala --- @@ -998,6 +1047,172 @@ class LinearRegressionSuite } } } + + test("linear regression (huber loss) with intercept without regularization") { --- End diff -- Do you know if these integration tests are in the L1 penalty regime or in the L2 regime? It'd be nice to make sure we're testing both. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19020#discussion_r139765053 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -69,19 +69,57 @@ private[regression] trait LinearRegressionParams extends PredictorParams "The solver algorithm for optimization. Supported options: " + s"${supportedSolvers.mkString(", ")}. (Default auto)", ParamValidators.inArray[String](supportedSolvers)) + + /** + * The loss function to be optimized. + * Supported options: "leastSquares" and "huber". + * Default: "leastSquares" + * + * @group param + */ + @Since("2.3.0") + final override val loss: Param[String] = new Param[String](this, "loss", "The loss function to" + +s" be optimized. Supported options: ${supportedLosses.mkString(", ")}. (Default leastSquares)", +ParamValidators.inArray[String](supportedLosses)) + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of epsilon, the huber criterion becomes more similar to least squares + * regression; for small values of epsilon, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + * Only valid when "loss" is "huber". + */ + @Since("2.3.0") + final val epsilon = new DoubleParam(this, "epsilon", "The shape parameter to control the " + +"amount of robustness. Must be > 1.0.", ParamValidators.gt(1.0)) + + /** @group getParam */ + @Since("2.3.0") + def getEpsilon: Double = $(epsilon) + + override protected def validateAndTransformSchema( + schema: StructType, + fitting: Boolean, + featuresDataType: DataType): StructType = { +if ($(loss) == Huber) { + require($(solver)!= Normal, "LinearRegression with huber loss doesn't support " + +"normal solver, please change solver to auto or l-bfgs.") + require($(elasticNetParam) == 0.0, "LinearRegression with huber loss only supports " + +s"L2 regularization, but got elasticNetParam = $getElasticNetParam.") + +} +super.validateAndTransformSchema(schema, fitting, featuresDataType) + } } /** * Linear regression. * - * The learning objective is to minimize the squared error, with regularization. - * The specific squared error loss function used is: - * - * - *$$ - *L = 1/2n ||A coefficients - y||^2^ - *$$ - * + * The learning objective is to minimize the specified loss function, with regularization. + * This supports two loss functions: + * - leastSquares (a.k.a squared loss) --- End diff -- Let's keep exact specifications of the losses being used. This is one of my big annoyances with many ML libraries: It's hard to tell exactly what loss is being used, which makes it hard to compare/validate results across different ML libraries. It'd also be nice to make it clear what we mean by "huber," in particular that we estimate the scale parameter from data. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19020#discussion_r139772375 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -471,6 +574,15 @@ object LinearRegression extends DefaultParamsReadable[LinearRegression] { /** Set of solvers that LinearRegression supports. */ private[regression] val supportedSolvers = Array(Auto, Normal, LBFGS) + + /** String name for "leastSquares". */ + private[regression] val LeastSquares = "leastSquares" --- End diff -- How about calling this "squaredError" since the loss is "squared error," not "least squares." --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19020#discussion_r139765068 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -220,12 +283,12 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String } val instr = Instrumentation.create(this, dataset) -instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol, - elasticNetParam, fitIntercept, maxIter, regParam, standardization, aggregationDepth) +instr.logParams(labelCol, featuresCol, weightCol, predictionCol, solver, tol, elasticNetParam, --- End diff -- Log epsilon (M) as well --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19020#discussion_r139764932 --- Diff: mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.Vector + +/** + * HuberAggregator computes the gradient and loss for a huber loss function, + * as used in robust regression for samples in sparse or dense vector in an online fashion. + * + * The huber loss function based on: + * Art B. Owen (2006), A robust hybrid of lasso and ridge regression. + * (http://statweb.stanford.edu/~owen/reports/hhu.pdf) + * + * Two HuberAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * The huber loss function is given by + * + * + * $$ + * \begin{align} + * \min_{w, \sigma}\frac{1}{2n}{\sum_{i=1}^n\left(\sigma + + * H_m\left(\frac{X_{i}w - y_{i}}{\sigma}\right)\sigma\right) + \frac{1}{2}\alpha {||w||_2}^2} + * \end{align} + * $$ + * + * + * where + * + * + * $$ + * \begin{align} + * H_m(z) = \begin{cases} + *z^2, & \text {if } |z| < \epsilon, \\ + *2\epsilon|z| - \epsilon^2, & \text{otherwise} + *\end{cases} + * \end{align} + * $$ + * + * + * It is advised to set the parameter $\epsilon$ to 1.35 to achieve 95% statistical efficiency. + * + * @param fitIntercept Whether to fit an intercept term. + * @param epsilon The shape parameter to control the amount of robustness. + * @param bcFeaturesStd The broadcast standard deviation values of the features. + * @param bcParameters including three parts: the regression coefficients corresponding + * to the features, the intercept (if fitIntercept is ture) + * and the scale parameter (sigma). + */ +private[ml] class HuberAggregator( +fitIntercept: Boolean, +epsilon: Double, +bcFeaturesStd: Broadcast[Array[Double]])(bcParameters: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, HuberAggregator] { + + protected override val dim: Int = bcParameters.value.size + private val numFeatures: Int = if (fitIntercept) dim - 2 else dim - 1 + + @transient private lazy val coefficients: Array[Double] = +bcParameters.value.toArray.slice(0, numFeatures) + private val sigma: Double = bcParameters.value(dim - 1) + + @transient private lazy val featuresStd = bcFeaturesStd.value + + /** + * Add a new training instance to this HuberAggregator, and update the loss and gradient + * of the objective function. + * + * @param instance The instance of data point to be added. + * @return This HuberAggregator object. + */ + def add(instance: Instance): HuberAggregator = { +instance match { case Instance(label, weight, features) => + require(numFeatures == features.size, s"Dimensions mismatch when adding new sample." + +s" Expecting $numFeatures but got ${features.size}.") + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") + + if (weight == 0.0) return this + + val margin = { +var sum = 0.0 +features.foreachActive { (index, value) => + if (featuresStd(index) != 0.0 && value != 0.0) { +sum += coefficients(index) * (value / featuresStd(index)) + } +} +if (fitIntercept) sum += bcParameters.value(dim - 2) +sum + } + val linearLoss = label - margin + + if (math.abs(linearLoss) <= sigma * epsilon) { +lossSum += 0.5 * weight * (sigma + mat
[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19020#discussion_r139765034 --- Diff: mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala --- @@ -69,19 +69,57 @@ private[regression] trait LinearRegressionParams extends PredictorParams "The solver algorithm for optimization. Supported options: " + s"${supportedSolvers.mkString(", ")}. (Default auto)", ParamValidators.inArray[String](supportedSolvers)) + + /** + * The loss function to be optimized. + * Supported options: "leastSquares" and "huber". + * Default: "leastSquares" + * + * @group param + */ + @Since("2.3.0") + final override val loss: Param[String] = new Param[String](this, "loss", "The loss function to" + +s" be optimized. Supported options: ${supportedLosses.mkString(", ")}. (Default leastSquares)", +ParamValidators.inArray[String](supportedLosses)) + + /** + * The shape parameter to control the amount of robustness. Must be > 1.0. + * At larger values of epsilon, the huber criterion becomes more similar to least squares + * regression; for small values of epsilon, the criterion is more similar to L1 regression. + * Default is 1.35 to get as much robustness as possible while retaining + * 95% statistical efficiency for normally distributed data. + * Only valid when "loss" is "huber". + */ + @Since("2.3.0") + final val epsilon = new DoubleParam(this, "epsilon", "The shape parameter to control the " + --- End diff -- Mark as expertParam (same for set/get) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...
Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/19020#discussion_r139764922 --- Diff: mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.Vector + +/** + * HuberAggregator computes the gradient and loss for a huber loss function, + * as used in robust regression for samples in sparse or dense vector in an online fashion. + * + * The huber loss function based on: + * Art B. Owen (2006), A robust hybrid of lasso and ridge regression. + * (http://statweb.stanford.edu/~owen/reports/hhu.pdf) + * + * Two HuberAggregator can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * The huber loss function is given by + * + * + * $$ + * \begin{align} + * \min_{w, \sigma}\frac{1}{2n}{\sum_{i=1}^n\left(\sigma + + * H_m\left(\frac{X_{i}w - y_{i}}{\sigma}\right)\sigma\right) + \frac{1}{2}\alpha {||w||_2}^2} + * \end{align} + * $$ + * + * + * where + * + * + * $$ + * \begin{align} + * H_m(z) = \begin{cases} + *z^2, & \text {if } |z| < \epsilon, \\ + *2\epsilon|z| - \epsilon^2, & \text{otherwise} + *\end{cases} + * \end{align} + * $$ + * + * + * It is advised to set the parameter $\epsilon$ to 1.35 to achieve 95% statistical efficiency. --- End diff -- This description is misleadingly general since this claim only applies to normally distributed data. How about referencing the part of the paper which talks about this so that people can look up what is meant here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19283: Update quickstart python dataset example
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19283 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19283: Update quickstart python dataset example
GitHub user fakegermano opened a pull request: https://github.com/apache/spark/pull/19283 Update quickstart python dataset example The python spark datasets example for the mapreduce was returning error, when using .select().as() function. Replaces the error to use the .select().name() function, that does the same thing and works ## What changes were proposed in this pull request? no changes ## How was this patch tested? no significant changes were made to the spark code. Just documentation changes. The change I implemented was tested manually over spark 2.2.0 Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fakegermano/spark quickstart-python-as-to-name Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19283.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19283 commit 29ab03c582bf115cfa802043337688005224ab46 Author: Daniel G Travieso Date: 2017-09-19T20:19:59Z Update quickstart python dataset example The python spark datasets example for the mapreduce was returning error, when using .select().as() function. Replaces the error to use the .select().name() function, that does the same thing and works --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19282 **[Test build #81948 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81948/testReport)** for PR 19282 at commit [`93ff675`](https://github.com/apache/spark/commit/93ff67576566e93eaca3220507049166133ad4b1). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81947/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19196 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81947 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81947/testReport)** for PR 19196 at commit [`505af43`](https://github.com/apache/spark/commit/505af43b746e8d3a13de25093a281d43c1ba53ba). * This patch **fails to generate documentation**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, en...
GitHub user srowen opened a pull request: https://github.com/apache/spark/pull/19282 [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it, fix violations ## What changes were proposed in this pull request? Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors. ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/srowen/spark SPARK-22066 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19282.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19282 commit c9d76d93ae8b82fe7e3d8257d700cc625f5b9ff3 Author: Sean Owen Date: 2017-09-19T20:08:26Z Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19211 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19196 **[Test build #81947 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81947/testReport)** for PR 19196 at commit [`505af43`](https://github.com/apache/spark/commit/505af43b746e8d3a13de25093a281d43c1ba53ba). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19211 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81940/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139800662 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -115,6 +115,7 @@ private[spark] class Executor( if (!isLocal) { env.metricsSystem.registerSource(executorSource) env.blockManager.initialize(conf.getAppId) +env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) --- End diff -- super nit: can you put the `reigsterSource()` calls next to each other? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139796177 --- Diff: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala --- @@ -18,11 +18,14 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer +import java.util --- End diff -- if you are trying to avoid confusion w/ scala's hashmaps, I think our convention is to rename w/ "J" prefix ``` import java.util.{HashMap => JHashMap} ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/19160#discussion_r139795417 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -248,6 +251,16 @@ private[spark] class BlockManager( logInfo(s"Initialized BlockManager: $blockManagerId") } + def shuffleMetricsSource: Source = { +import BlockManager._ + +if (externalShuffleServiceEnabled) { + new ShuffleMetricsSource("ExternalShuffle", shuffleClient.shuffleMetrics()) +} else { + new ShuffleMetricsSource("NettyBlockTransfer", shuffleClient.shuffleMetrics()) --- End diff -- do you think we really need to distinguish these two cases? whether or not you have the external shuffle service, this memory is still owned by the executor JVM (its really only external on the remote end). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org