[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139833470
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
--- End diff --

done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139833312
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
--- End diff --

removed the other one. not needed. copied the docs to this location.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139832973
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
--- End diff --

The main reason why I wanted a separate SPIP for the write path was this 
point in the doc:

> Ideally partitioning/bucketing concept should not be exposed in the Data 
Source API V2, because they are just techniques for data skipping and 
pre-partitioning. However, these 2 concepts are already widely used in Spark, 
e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be 
consistent, we need to add partitioning/bucketing to Data Source V2, so that 
the implementations can be able to specify partitioning/bucketing for 
read/write.

There's a lot in there that's worth thinking about and possibly changing:

1. Ideally, the DataSourceV2 API wouldn't support bucketing/partitioning
2. The current DataFrameWriter API is what we should continue to support
3. Implementations should supply bucketing and partitioning for writes 
because of 2

**Bucketing/partitioning**: It comes down to the level at which this API is 
going to be used. It looks like this API currently ignores bucketing and 
partitioning (unless my read through was too quick). I think I agree that in 
the long term that's a good thing, but we need ways for a data source to tell 
Spark about its requirements for incoming data.

In the current version, it looks like Spark would know how to prepare data 
for writers outside of this API (rather than including support as suggested by 
point 3). When writing a partitioned table, Spark would get the partitioning 
from the table definition in the metastore and automatically sort by partition 
columns. Is that right?

I'd like to move the data store's requirements behind this API. For 
example, writing to HBase files directly requires sorting by key first. We 
don't want to do the sort in the writer because it may duplicate work (and 
isn't captured in the physical plan), and we also don't want to require Spark 
to know about the requirements of the HBase data store, or any other specific 
implementation.

**DataFrameWriter API**: I'd like to talk about separating the API for 
table definitions and writes, but not necessarily as part of this work. The 
SPIP should clearly state whether that's part of th

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832926
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832660
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832320
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832253
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832098
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832036
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831879
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831615
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831209
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831092
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830996
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830591
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
--- End diff --

done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions a

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830482
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -114,6 +115,16 @@ class IncrementalExecution(
   stateInfo = Some(nextStatefulOperationStateInfo),
   batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
   eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
+
+  case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, 
_, left, right) =>
+j.copy(
+  stateInfo = Some(nextStatefulOperationStateInfo),
--- End diff --

Whatever optimization takes place, the same optimizations will occur in 
EVERY batch. So if aggregation is pushed below join, then all the batches will 
have that. 

What we have to guard against is cost-based optimization that can reorder 
things differently in different batches. That is, why I have disabled 
cost-based join optimization. And when adding such optimizations in the future, 
we have to be cautious for the streaming case and disable them.

Also, this is a general concern with other stateful ops as well, not 
something that this PR would address.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19170
  
**[Test build #81951 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81951/testReport)**
 for PR 19170 at commit 
[`04c1e2a`](https://github.com/apache/spark/commit/04c1e2aa24c61f13f1df5148416bb00f0649fcaf).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19170
  
cc @vanzin 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread zsxwing
Github user zsxwing commented on the issue:

https://github.com/apache/spark/pull/19170
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18659
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81945/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18659
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18659
  
**[Test build #81945 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81945/testReport)**
 for PR 18659 at commit 
[`69112a5`](https://github.com/apache/spark/commit/69112a5a771bc3c98a7cd0b21ffda883d86c41a4).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `// enable memo iff we serialize the row with schema (schema and 
class should be memorized)`
  * `abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: 
Seq[Attribute], child: SparkPlan)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #13477: [SPARK-15739][GraphX] Expose aggregateMessagesWithActive...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/13477
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139819360
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true)
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, null), 
2)
+
+// make 5 offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host2"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 3 cores per executor)
--- End diff --

update comments
5 offers -> 6 offers
twice -> three times


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139819793
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -1255,6 +1255,97 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("limit max concurrent running tasks in a job group when configured 
") {
+val conf = new SparkConf().
+  set(config.BLACKLIST_ENABLED, true).
+  set("spark.job.testJobGroup.maxConcurrentTasks", "2") // Limit max 
concurrent tasks to 2
+
+sc = new SparkContext("local", "test", conf)
+sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", 
"host2"))
+val props = new Properties();
+props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "testJobGroup") // 
set the job group
+
+val tasks = Array.tabulate[Task[_]](10) { i =>
+  new FakeTask(0, i, Nil)
+}
+val tsm = new TaskSetManager(sched, new TaskSet(tasks, 0, 0, 0, 
props), 2)
+
+// make some offers to our taskset
+var taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  // offer each executor twice (simulating 2 cores per executor)
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 2) // Out of the 4 offers, tsm can accept up 
to maxConcurrentTasks.
+
+// make 4 more offers
+val taskDescs2 = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs2.size === 0) // tsm doesn't accept any as it is 
already running at max tasks
+
+// inform tsm that one task has completed
+val directTaskResult = createTaskResult(0)
+tsm.handleSuccessfulTask(taskDescs(0).taskId, directTaskResult)
+
+// make 4 more offers after previous task completed
+taskDescs = Seq(
+  "exec1" -> "host1",
+  "exec2" -> "host1"
+).flatMap { case (exec, host) =>
+  (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, 
TaskLocality.ANY)}
+}
+assert(taskDescs.size === 1) // tsm accepts one as it can run one more 
task
+  }
+
+  test("do not limit max concurrent running tasks in a job group by 
default") {
--- End diff --

I don't think this test really adds anything beyond other tests in this 
suite.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139821899
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -619,6 +625,47 @@ private[spark] class ExecutorAllocationManager(
 // place the executors.
 private val stageIdToExecutorPlacementHints = new mutable.HashMap[Int, 
(Int, Map[String, Int])]
 
+override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
+  jobStart.stageInfos.foreach(stageInfo => 
stageIdToJobId(stageInfo.stageId) = jobStart.jobId)
+
+  var jobGroupId = if (jobStart.properties != null) {
+jobStart.properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID)
+  } else {
+null
+  }
+
+  val maxConTasks = if (jobGroupId != null &&
+conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+conf.get(s"spark.job.$jobGroupId.maxConcurrentTasks").toInt
+  } else {
+Int.MaxValue
+  }
+
+  if (maxConTasks <= 0) {
+throw new IllegalArgumentException(
+  "Maximum Concurrent Tasks should be set greater than 0 for the 
job to progress.")
+  }
+
+  if (jobGroupId == null || 
!conf.contains(s"spark.job.$jobGroupId.maxConcurrentTasks")) {
+jobGroupId = DEFAULT_JOB_GROUP
+  }
+
+  jobIdToJobGroup(jobStart.jobId) = jobGroupId
+  if (!jobGroupToMaxConTasks.contains(jobGroupId)) {
--- End diff --

this is probably a weird / unusual situation, but is this really the 
behavior you want if there are multiple jobs submitted for the same job group?  
Wouldn't you just take the conf for the job group at the time each job was 
submitted?

Worst case with this approach: say you are *always* submitting multiple 
jobs for each job group; when one finishes, you immediately start another one, 
so that the new one partially overlaps the old one.  Then even if you change 
the conf, all jobs will keep using the old value forever.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19194: [SPARK-20589] Allow limiting task concurrency per...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19194#discussion_r139818031
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -758,11 +825,52 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumSpeculativeTasks(stageId) =
   stageIdToNumSpeculativeTasks.getOrElse(stageId, 0) + 1
+maxConcurrentTasks = getMaxConTasks
+logDebug(s"Setting max concurrent tasks to $maxConcurrentTasks on 
spec. task submitted.")
 allocationManager.onSchedulerBacklogged()
   }
 }
 
 /**
+ * Calculate the maximum no. of concurrent tasks that can run 
currently.
+ */
+def getMaxConTasks(): Int = {
+  // We can limit the no. of concurrent tasks by a job group. A job 
group can have multiple jobs
+  // with multiple stages. We need to get all the active stages 
belonging to a job group to
+  // calculate the total no. of pending + running tasks to decide the 
maximum no. of executors
+  // we need at that time to serve the outstanding tasks. This is 
capped by the minimum no. of
+  // outstanding tasks and the max concurrent limit specified for the 
job group if any.
+
+  def getIncompleteTasksForStage(stageId: Int, numTasks: Int): Int = {
+totalPendingTasks(stageId) + totalRunningTasks(stageId)
+  }
+
+  def sumIncompleteTasksForStages: (Int, (Int, Int)) => Int = 
(totalTasks, stageToNumTasks) => {
+val activeTasks = getIncompleteTasksForStage(stageToNumTasks._1, 
stageToNumTasks._2)
+sumOrMax(totalTasks, activeTasks)
+  }
+  // Get the total running & pending tasks for all stages in a job 
group.
+  def getIncompleteTasksForJobGroup(stagesItr: mutable.HashMap[Int, 
Int]): Int = {
+stagesItr.foldLeft(0)(sumIncompleteTasksForStages)
+  }
+
+  def sumIncompleteTasksForJobGroup: (Int, (String, 
mutable.HashMap[Int, Int])) => Int = {
+(maxConTasks, x) => {
+  val totalIncompleteTasksForJobGroup = 
getIncompleteTasksForJobGroup(x._2)
+  val maxTasks = Math.min(jobGroupToMaxConTasks(x._1), 
totalIncompleteTasksForJobGroup)
+  sumOrMax(maxConTasks, maxTasks)
+}
+  }
+
+  def sumOrMax(a: Int, b: Int): Int = if (doesSumOverflow(a, b)) 
Int.MaxValue else (a + b)
+
+  def doesSumOverflow(a: Int, b: Int): Boolean = b > (Int.MaxValue - a)
+
+  val stagesByJobGroup = stageIdToNumTasks.groupBy(x => 
jobIdToJobGroup(stageIdToJobId(x._1)))
--- End diff --

you could just store `stageIdToJobGroupId`.  Simplifies this a bit, and 
then you dont' need to store `jobIdToJobGroup` at all, I think


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r139821887
  
--- Diff: docs/sql-programming-guide.md ---
@@ -968,6 +968,13 @@ Configuration of Parquet can be done using the 
`setConf` method on `SparkSession
 
   
 
+
+  spark.sql.autoTypeCastingCompatibility
+  false
--- End diff --

`hive, default`
 hive, default



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81950 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81950/testReport)**
 for PR 19196 at commit 
[`8a6eafe`](https://github.com/apache/spark/commit/8a6eafef056b2a64ee0be07ce886ad69dc295537).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats ...

2017-09-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19252


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18853: [SPARK-21646][SQL] CommonType for binary comparis...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/18853#discussion_r139821539
  
--- Diff: docs/sql-programming-guide.md ---
@@ -968,6 +968,13 @@ Configuration of Parquet can be done using the 
`setConf` method on `SparkSession
 
   
 
+
+  spark.sql.autoTypeCastingCompatibility
--- End diff --

`spark.sql.typeCoercion.mode`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19252
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139820801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
--- End diff --

how about moving this to `SortOrder` object : 
https://github.com/apache/spark/blob/e9c91badce64731ffd3e53cbcd9f044a7593e6b8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala#L92


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19252
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139816308
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
@@ -399,14 +399,17 @@ private[ml] object DefaultParamsReader {
* This works if all Params implement 
[[org.apache.spark.ml.param.Param.jsonDecode()]].
--- End diff --

Update the docstring to state that params included in `skipParams` aren't 
set.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139809836
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala ---
@@ -399,14 +399,17 @@ private[ml] object DefaultParamsReader {
* This works if all Params implement 
[[org.apache.spark.ml.param.Param.jsonDecode()]].
* TODO: Move to [[Metadata]] method
*/
-  def getAndSetParams(instance: Params, metadata: Metadata): Unit = {
+  def getAndSetParams(instance: Params, metadata: Metadata,
+  skipParams: List[String] = null): Unit = {
--- End diff --

Use an `Option[List[String]]` that defaults to `None` instead of a 
`List[String]` that defaults to null?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread smurching
Github user smurching commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139817121
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ml.tuning
 
+import java.io.IOException
--- End diff --

This exception is unused & can be removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19250
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19250
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81943/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19250: [SPARK-12297] Table timezone correction for Timestamps

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19250
  
**[Test build #81943 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81943/testReport)**
 for PR 19250 at commit 
[`515b38b`](https://github.com/apache/spark/commit/515b38b7b0a1efc0de4a92479c1e5a479b872146).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139815599
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139804436
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139811883
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139808731
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139803500
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -114,6 +115,16 @@ class IncrementalExecution(
   stateInfo = Some(nextStatefulOperationStateInfo),
   batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
   eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
+
+  case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, 
_, left, right) =>
+j.copy(
+  stateInfo = Some(nextStatefulOperationStateInfo),
--- End diff --

So, I think this may not be robust. At each trigger, we create a new 
IncrementalExecution, so the `statefulOperatorId` we hope gets incremented in a 
deterministic manner.
I can imagine adding things to the Optimizer in the future which may move 
an `EquiJoin` before an aggregation.
In this case, the state store id's of the aggregation and join may switch. 
I'm not sure if we're protected against that somehow, so just wanted to bring 
it up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139816392
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139804087
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
--- End diff --

nit: `with the new data`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139810069
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139806969
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139807021
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139816206
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139812580
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
--- End diff --

specify which class docs


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139812749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139814640
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139813860
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139813588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139814268
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139806701
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139815407
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139806782
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139809419
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139815966
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139812115
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139811702
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139809689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139815159
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+expr.map(JoinSt

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139807880
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139812315
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
--- End diff --

I see this code is duplicated above. Maybe you can make it a function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139815684
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java
 ---
@@ -52,4 +55,13 @@ public abstract void fetchBlocks(
   String[] blockIds,
   BlockFetchingListener listener,
   TempShuffleFileManager tempShuffleFileManager);
+
+  /**
+   * Get the shuffle MetricsSet from ShuffleClient, this will be used used 
in MetricsSystem to
--- End diff --

nit: `this will be used used` -> `this will be used`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139815983
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java
 ---
@@ -117,6 +118,12 @@ public void fetchBlocks(
 }
   }
 
+  @Override
+  public MetricSet shuffleMetrics() {
+checkInit();
--- End diff --

not related to the change here -- but should we also checkInit in the 
`close()` function?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19145: [spark-21933][yarn] Spark Streaming request more executo...

2017-09-19 Thread squito
Github user squito commented on the issue:

https://github.com/apache/spark/pull/19145
  
I'm not sure I totally follow the sequence of events, but I get the feeling 
this should be handled in yarn, not spark.

Also, I agree with Jerry, it seems like your `completedContainerIdSet` may 
grow continuously.  You'll remove from it *if* you happen to get a duplicate 
message.  But I think in most cases you will not a get duplicate message, if I 
understand correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19280
  
**[Test build #81949 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81949/testReport)**
 for PR 19280 at commit 
[`20e3585`](https://github.com/apache/spark/commit/20e3585eac16ef3bfe403ec23f57a2705ff47ecb).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19280
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81944/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19280
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19280
  
**[Test build #81944 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81944/testReport)**
 for PR 19280 at commit 
[`acbaf8b`](https://github.com/apache/spark/commit/acbaf8b65629344d760360b768e89f1712af8942).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19283: Update quickstart python dataset example

2017-09-19 Thread srowen
Github user srowen commented on the issue:

https://github.com/apache/spark/pull/19283
  
Have a look at http://spark.apache.org/contributing.html -- maybe prefix 
the title with `[MINOR][DOCS]` for completeness?

Are there other instances of this same issue in the Pyspark docs?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19252
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19252: [SPARK-21969][SQL] CommandUtils.updateTableStats should ...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19252
  
**[Test build #81941 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81941/testReport)**
 for PR 19252 at commit 
[`63f9dc2`](https://github.com/apache/spark/commit/63f9dc23f781bdf6e8c891e1a502b91ccdbc0281).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-09-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r139807816
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/regression/LinearRegressionSuite.scala 
---
@@ -998,6 +1047,172 @@ class LinearRegressionSuite
   }
 }
   }
+
+  test("linear regression (huber loss) with intercept without 
regularization") {
--- End diff --

Do you know if these integration tests are in the L1 penalty regime or in 
the L2 regime?  It'd be nice to make sure we're testing both.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-09-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r139765053
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -69,19 +69,57 @@ private[regression] trait LinearRegressionParams 
extends PredictorParams
 "The solver algorithm for optimization. Supported options: " +
   s"${supportedSolvers.mkString(", ")}. (Default auto)",
 ParamValidators.inArray[String](supportedSolvers))
+
+  /**
+   * The loss function to be optimized.
+   * Supported options: "leastSquares" and "huber".
+   * Default: "leastSquares"
+   *
+   * @group param
+   */
+  @Since("2.3.0")
+  final override val loss: Param[String] = new Param[String](this, "loss", 
"The loss function to" +
+s" be optimized. Supported options: ${supportedLosses.mkString(", ")}. 
(Default leastSquares)",
+ParamValidators.inArray[String](supportedLosses))
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of epsilon, the huber criterion becomes more similar 
to least squares
+   * regression; for small values of epsilon, the criterion is more 
similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   * Only valid when "loss" is "huber".
+   */
+  @Since("2.3.0")
+  final val epsilon = new DoubleParam(this, "epsilon", "The shape 
parameter to control the " +
+"amount of robustness. Must be > 1.0.", ParamValidators.gt(1.0))
+
+  /** @group getParam */
+  @Since("2.3.0")
+  def getEpsilon: Double = $(epsilon)
+
+  override protected def validateAndTransformSchema(
+  schema: StructType,
+  fitting: Boolean,
+  featuresDataType: DataType): StructType = {
+if ($(loss) == Huber) {
+  require($(solver)!= Normal, "LinearRegression with huber loss 
doesn't support " +
+"normal solver, please change solver to auto or l-bfgs.")
+  require($(elasticNetParam) == 0.0, "LinearRegression with huber loss 
only supports " +
+s"L2 regularization, but got elasticNetParam = 
$getElasticNetParam.")
+
+}
+super.validateAndTransformSchema(schema, fitting, featuresDataType)
+  }
 }
 
 /**
  * Linear regression.
  *
- * The learning objective is to minimize the squared error, with 
regularization.
- * The specific squared error loss function used is:
- *
- * 
- *$$
- *L = 1/2n ||A coefficients - y||^2^
- *$$
- * 
+ * The learning objective is to minimize the specified loss function, with 
regularization.
+ * This supports two loss functions:
+ *  - leastSquares (a.k.a squared loss)
--- End diff --

Let's keep exact specifications of the losses being used.  This is one of 
my big annoyances with many ML libraries: It's hard to tell exactly what loss 
is being used, which makes it hard to compare/validate results across different 
ML libraries.

It'd also be nice to make it clear what we mean by "huber," in particular 
that we estimate the scale parameter from data.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-09-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r139772375
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -471,6 +574,15 @@ object LinearRegression extends 
DefaultParamsReadable[LinearRegression] {
 
   /** Set of solvers that LinearRegression supports. */
   private[regression] val supportedSolvers = Array(Auto, Normal, LBFGS)
+
+  /** String name for "leastSquares". */
+  private[regression] val LeastSquares = "leastSquares"
--- End diff --

How about calling this "squaredError" since the loss is "squared error," 
not "least squares."


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-09-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r139765068
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -220,12 +283,12 @@ class LinearRegression @Since("1.3.0") 
(@Since("1.3.0") override val uid: String
 }
 
 val instr = Instrumentation.create(this, dataset)
-instr.logParams(labelCol, featuresCol, weightCol, predictionCol, 
solver, tol,
-  elasticNetParam, fitIntercept, maxIter, regParam, standardization, 
aggregationDepth)
+instr.logParams(labelCol, featuresCol, weightCol, predictionCol, 
solver, tol, elasticNetParam,
--- End diff --

Log epsilon (M) as well


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-09-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r139764932
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala 
---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.Vector
+
+/**
+ * HuberAggregator computes the gradient and loss for a huber loss 
function,
+ * as used in robust regression for samples in sparse or dense vector in 
an online fashion.
+ *
+ * The huber loss function based on:
+ * Art B. Owen (2006), A robust hybrid of lasso and ridge regression.
+ * (http://statweb.stanford.edu/~owen/reports/hhu.pdf)
+ *
+ * Two HuberAggregator can be merged together to have a summary of loss 
and gradient of
+ * the corresponding joint dataset.
+ *
+ * The huber loss function is given by
+ *
+ * 
+ *   $$
+ *   \begin{align}
+ *   \min_{w, \sigma}\frac{1}{2n}{\sum_{i=1}^n\left(\sigma +
+ *   H_m\left(\frac{X_{i}w - y_{i}}{\sigma}\right)\sigma\right) + 
\frac{1}{2}\alpha {||w||_2}^2}
+ *   \end{align}
+ *   $$
+ * 
+ *
+ * where
+ *
+ * 
+ *   $$
+ *   \begin{align}
+ *   H_m(z) = \begin{cases}
+ *z^2, & \text {if } |z| < \epsilon, \\
+ *2\epsilon|z| - \epsilon^2, & \text{otherwise}
+ *\end{cases}
+ *   \end{align}
+ *   $$
+ * 
+ *
+ * It is advised to set the parameter $\epsilon$ to 1.35 to achieve 95% 
statistical efficiency.
+ *
+ * @param fitIntercept Whether to fit an intercept term.
+ * @param epsilon The shape parameter to control the amount of robustness.
+ * @param bcFeaturesStd The broadcast standard deviation values of the 
features.
+ * @param bcParameters including three parts: the regression coefficients 
corresponding
+ * to the features, the intercept (if fitIntercept is 
ture)
+ * and the scale parameter (sigma).
+ */
+private[ml] class HuberAggregator(
+fitIntercept: Boolean,
+epsilon: Double,
+bcFeaturesStd: Broadcast[Array[Double]])(bcParameters: 
Broadcast[Vector])
+  extends DifferentiableLossAggregator[Instance, HuberAggregator] {
+
+  protected override val dim: Int = bcParameters.value.size
+  private val numFeatures: Int = if (fitIntercept) dim - 2 else dim - 1
+
+  @transient private lazy val coefficients: Array[Double] =
+bcParameters.value.toArray.slice(0, numFeatures)
+  private val sigma: Double = bcParameters.value(dim - 1)
+
+  @transient private lazy val featuresStd = bcFeaturesStd.value
+
+  /**
+   * Add a new training instance to this HuberAggregator, and update the 
loss and gradient
+   * of the objective function.
+   *
+   * @param instance The instance of data point to be added.
+   * @return This HuberAggregator object.
+   */
+  def add(instance: Instance): HuberAggregator = {
+instance match { case Instance(label, weight, features) =>
+  require(numFeatures == features.size, s"Dimensions mismatch when 
adding new sample." +
+s" Expecting $numFeatures but got ${features.size}.")
+  require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0")
+
+  if (weight == 0.0) return this
+
+  val margin = {
+var sum = 0.0
+features.foreachActive { (index, value) =>
+  if (featuresStd(index) != 0.0 && value != 0.0) {
+sum += coefficients(index) * (value / featuresStd(index))
+  }
+}
+if (fitIntercept) sum += bcParameters.value(dim - 2)
+sum
+  }
+  val linearLoss = label - margin
+
+  if (math.abs(linearLoss) <= sigma * epsilon) {
+lossSum += 0.5 * weight * (sigma +  mat

[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-09-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r139765034
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala ---
@@ -69,19 +69,57 @@ private[regression] trait LinearRegressionParams 
extends PredictorParams
 "The solver algorithm for optimization. Supported options: " +
   s"${supportedSolvers.mkString(", ")}. (Default auto)",
 ParamValidators.inArray[String](supportedSolvers))
+
+  /**
+   * The loss function to be optimized.
+   * Supported options: "leastSquares" and "huber".
+   * Default: "leastSquares"
+   *
+   * @group param
+   */
+  @Since("2.3.0")
+  final override val loss: Param[String] = new Param[String](this, "loss", 
"The loss function to" +
+s" be optimized. Supported options: ${supportedLosses.mkString(", ")}. 
(Default leastSquares)",
+ParamValidators.inArray[String](supportedLosses))
+
+  /**
+   * The shape parameter to control the amount of robustness. Must be > 
1.0.
+   * At larger values of epsilon, the huber criterion becomes more similar 
to least squares
+   * regression; for small values of epsilon, the criterion is more 
similar to L1 regression.
+   * Default is 1.35 to get as much robustness as possible while retaining
+   * 95% statistical efficiency for normally distributed data.
+   * Only valid when "loss" is "huber".
+   */
+  @Since("2.3.0")
+  final val epsilon = new DoubleParam(this, "epsilon", "The shape 
parameter to control the " +
--- End diff --

Mark as expertParam (same for set/get)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19020: [SPARK-3181] [ML] Implement huber loss for Linear...

2017-09-19 Thread jkbradley
Github user jkbradley commented on a diff in the pull request:

https://github.com/apache/spark/pull/19020#discussion_r139764922
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HuberAggregator.scala 
---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.ml.optim.aggregator
+
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.feature.Instance
+import org.apache.spark.ml.linalg.Vector
+
+/**
+ * HuberAggregator computes the gradient and loss for a huber loss 
function,
+ * as used in robust regression for samples in sparse or dense vector in 
an online fashion.
+ *
+ * The huber loss function based on:
+ * Art B. Owen (2006), A robust hybrid of lasso and ridge regression.
+ * (http://statweb.stanford.edu/~owen/reports/hhu.pdf)
+ *
+ * Two HuberAggregator can be merged together to have a summary of loss 
and gradient of
+ * the corresponding joint dataset.
+ *
+ * The huber loss function is given by
+ *
+ * 
+ *   $$
+ *   \begin{align}
+ *   \min_{w, \sigma}\frac{1}{2n}{\sum_{i=1}^n\left(\sigma +
+ *   H_m\left(\frac{X_{i}w - y_{i}}{\sigma}\right)\sigma\right) + 
\frac{1}{2}\alpha {||w||_2}^2}
+ *   \end{align}
+ *   $$
+ * 
+ *
+ * where
+ *
+ * 
+ *   $$
+ *   \begin{align}
+ *   H_m(z) = \begin{cases}
+ *z^2, & \text {if } |z| < \epsilon, \\
+ *2\epsilon|z| - \epsilon^2, & \text{otherwise}
+ *\end{cases}
+ *   \end{align}
+ *   $$
+ * 
+ *
+ * It is advised to set the parameter $\epsilon$ to 1.35 to achieve 95% 
statistical efficiency.
--- End diff --

This description is misleadingly general since this claim only applies to 
normally distributed data.  How about referencing the part of the paper which 
talks about this so that people can look up what is meant here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19283: Update quickstart python dataset example

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19283
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19283: Update quickstart python dataset example

2017-09-19 Thread fakegermano
GitHub user fakegermano opened a pull request:

https://github.com/apache/spark/pull/19283

Update quickstart python dataset example

The python spark datasets example for the mapreduce was returning error, 
when using .select().as() function.
Replaces the error to use the .select().name() function, that does the same 
thing and works

## What changes were proposed in this pull request?

no changes

## How was this patch tested?

no significant changes were made to the spark code. Just documentation 
changes. The change I implemented was tested manually over spark 2.2.0

Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fakegermano/spark quickstart-python-as-to-name

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19283.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19283


commit 29ab03c582bf115cfa802043337688005224ab46
Author: Daniel G Travieso 
Date:   2017-09-19T20:19:59Z

Update quickstart python dataset example

The python spark datasets example for the mapreduce was returning error, 
when using .select().as() function.
Replaces the error to use the .select().name() function, that does the same 
thing and works




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19282
  
**[Test build #81948 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81948/testReport)**
 for PR 19282 at commit 
[`93ff675`](https://github.com/apache/spark/commit/93ff67576566e93eaca3220507049166133ad4b1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81947/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81947 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81947/testReport)**
 for PR 19196 at commit 
[`505af43`](https://github.com/apache/spark/commit/505af43b746e8d3a13de25093a281d43c1ba53ba).
 * This patch **fails to generate documentation**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, en...

2017-09-19 Thread srowen
GitHub user srowen opened a pull request:

https://github.com/apache/spark/pull/19282

[SPARK-22066][BUILD] Update checkstyle to 8.2, enable it, fix violations

## What changes were proposed in this pull request?

Update plugins, including scala-maven-plugin, to latest versions. Update 
checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing 
and new Java checkstyle errors.

## How was this patch tested?

Existing tests

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/srowen/spark SPARK-22066

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19282.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19282


commit c9d76d93ae8b82fe7e3d8257d700cc625f5b9ff3
Author: Sean Owen 
Date:   2017-09-19T20:08:26Z

Update plugins, including scala-maven-plugin, to latest versions. Update 
checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing 
and new Java checkstyle errors.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19211
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81947 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81947/testReport)**
 for PR 19196 at commit 
[`505af43`](https://github.com/apache/spark/commit/505af43b746e8d3a13de25093a281d43c1ba53ba).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19211: [SPARK-18838][core] Add separate listener queues to Live...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19211
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81940/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139800662
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -115,6 +115,7 @@ private[spark] class Executor(
   if (!isLocal) {
 env.metricsSystem.registerSource(executorSource)
 env.blockManager.initialize(conf.getAppId)
+env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource)
--- End diff --

super nit: can you put the `reigsterSource()` calls next to each other?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139796177
  
--- Diff: 
core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 ---
@@ -18,11 +18,14 @@
 package org.apache.spark.network.netty
 
 import java.nio.ByteBuffer
+import java.util
--- End diff --

if you are trying to avoid confusion w/ scala's hashmaps, I think our 
convention is to rename w/ "J" prefix

```
import java.util.{HashMap => JHashMap}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19160: [SPARK-21934][CORE] Expose Shuffle Netty memory u...

2017-09-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/19160#discussion_r139795417
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -248,6 +251,16 @@ private[spark] class BlockManager(
 logInfo(s"Initialized BlockManager: $blockManagerId")
   }
 
+  def shuffleMetricsSource: Source = {
+import BlockManager._
+
+if (externalShuffleServiceEnabled) {
+  new ShuffleMetricsSource("ExternalShuffle", 
shuffleClient.shuffleMetrics())
+} else {
+  new ShuffleMetricsSource("NettyBlockTransfer", 
shuffleClient.shuffleMetrics())
--- End diff --

do you think we really need to distinguish these two cases?  whether or not 
you have the external shuffle service, this memory is still owned by the 
executor JVM (its really only external on the remote end).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >