[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139854872
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala ---
@@ -122,7 +123,7 @@ class TrainValidationSplit @Since("1.5.0") 
(@Since("1.5.0") override val uid: St
 
 // Fit models in a Future for training in parallel
 logDebug(s"Train split with multiple sets of parameters.")
-val modelFutures = epm.map { paramMap =>
+val modelFutures = epm.map { case paramMap =>
--- End diff --

No. I will remove it. sorry.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread WeichenXu123
Github user WeichenXu123 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139854853
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -283,6 +282,8 @@ object CrossValidatorModel extends 
MLReadable[CrossValidatorModel] {
 
 ValidatorParams.validateParams(instance)
 
+protected var shouldPersistSubModels: Boolean = false
+
--- End diff --

Yes. I will remove it. sorry.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...

2017-09-19 Thread pgandhi999
Github user pgandhi999 commented on the issue:

https://github.com/apache/spark/pull/19270
  
No problem. Thank you for your valuable comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19284
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19284
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81952/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19284
  
**[Test build #81952 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81952/testReport)**
 for PR 19284 at commit 
[`5ac572d`](https://github.com/apache/spark/commit/5ac572d3cb2422f57e101fa2cbc761f4b748daa6).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18704: [SPARK-20783][SQL] Create ColumnVector to abstract exist...

2017-09-19 Thread rxin
Github user rxin commented on the issue:

https://github.com/apache/spark/pull/18704
  
cc @michal-databricks any thoughts on this?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19285
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calculate i...

2017-09-19 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19281
  
@gatorsmile @cloud-fan could you trigger tests if it is worth fixing? 
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19285: [SPARK-22068][CORE]Reduce the duplicate code between put...

2017-09-19 Thread ConeyLiu
Github user ConeyLiu commented on the issue:

https://github.com/apache/spark/pull/19285
  
Hi @cloud-fan @jiangxb1987 , would you mind take a look ? Thanks a lot.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...

2017-09-19 Thread ConeyLiu
GitHub user ConeyLiu opened a pull request:

https://github.com/apache/spark/pull/19285

[SPARK-22068][CORE]Reduce the duplicate code between putIteratorAsValues 
and putIteratorAsBytes

## What changes were proposed in this pull request?

The code logic between `MemoryStore.putIteratorAsValues` and 
`Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate 
code between them.

## How was this patch tested?

Existing UT.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ConeyLiu/spark rmemorystore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19285.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19285


commit 2c20dcbcf499aee5d6fbbb80f4803b3cad37c17c
Author: Xianyang Liu 
Date:   2017-09-17T09:53:49Z

refactor memorystore

commit 120564303641a92d32ec434dba5076771f6d6e80
Author: Xianyang Liu 
Date:   2017-09-19T08:47:24Z

fix conflicts

commit 92e1d51b18a810307a0b6d0cb761925a0429ead2
Author: Xianyang Liu 
Date:   2017-09-19T23:45:17Z

fix bug and add some comments

commit 6e2e29be7ad9d4bf3aae2d55fb4bf93c3286009b
Author: Xianyang Liu 
Date:   2017-09-20T00:28:35Z

better variable name




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19271: [SPARK-22053][SS] Stream-stream inner join in Append Mod...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19271
  
**[Test build #81957 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81957/testReport)**
 for PR 19271 at commit 
[`8a551d7`](https://github.com/apache/spark/commit/8a551d7fc045fd633cc15c14b27133a13eacb727).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139852354
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139852189
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

Yea, it actually affects the performance because we can avoid an extra loop:

```python
def im_map(x):
print("I am map %s" % x)
return x

def im_gen(x):
print("I am gen %s" % x)
return x

def im_list(x):
print("I am list %s" % x)
return x

items = list(range(3))
map(im_map, [im_list(item) for item in items])
map(im_map, (im_gen(item) for item in items))
```

And .. this actually affects the performance up to my knowledge:

```python
import time

items = list(xrange(int(1e8)))

for _ in xrange(10):
s = time.time()
_ = map(lambda x: x, [item for item in items])
print "I am list comprehension with a list: %s" % (time.time() - s)
s = time.time()
_ = map(lambda x: x, (item for item in items))
print "I am generator expression with a list: %s" % (time.time() - s)
```

This gives me ~13% improvement :).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139852081
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139850475
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -283,6 +282,8 @@ object CrossValidatorModel extends 
MLReadable[CrossValidatorModel] {
 
 ValidatorParams.validateParams(instance)
 
+protected var shouldPersistSubModels: Boolean = false
+
--- End diff --

Is this included by accident?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139850997
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/TrainValidationSplit.scala ---
@@ -122,7 +123,7 @@ class TrainValidationSplit @Since("1.5.0") 
(@Since("1.5.0") override val uid: St
 
 // Fit models in a Future for training in parallel
 logDebug(s"Train split with multiple sets of parameters.")
-val modelFutures = epm.map { paramMap =>
+val modelFutures = epm.map { case paramMap =>
--- End diff --

Was this intentional?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139851719
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -212,14 +213,12 @@ object CrossValidator extends 
MLReadable[CrossValidator] {
 
   val (metadata, estimator, evaluator, estimatorParamMaps) =
 ValidatorParams.loadImpl(path, sc, className)
-  val numFolds = (metadata.params \ "numFolds").extract[Int]
-  val seed = (metadata.params \ "seed").extract[Long]
-  new CrossValidator(metadata.uid)
+  val cv = new CrossValidator(metadata.uid)
 .setEstimator(estimator)
 .setEvaluator(evaluator)
 .setEstimatorParamMaps(estimatorParamMaps)
-.setNumFolds(numFolds)
-.setSeed(seed)
+  DefaultParamsReader.getAndSetParams(cv, metadata, skipParams = 
List("estimatorParamMaps"))
--- End diff --

do you also need to skip `estimator` and `evaluator`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19278: [SPARK-22060][ML] Fix CrossValidator/TrainValidat...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/19278#discussion_r139851109
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala ---
@@ -303,16 +304,16 @@ object CrossValidatorModel extends 
MLReadable[CrossValidatorModel] {
   val (metadata, estimator, evaluator, estimatorParamMaps) =
 ValidatorParams.loadImpl(path, sc, className)
   val numFolds = (metadata.params \ "numFolds").extract[Int]
-  val seed = (metadata.params \ "seed").extract[Long]
   val bestModelPath = new Path(path, "bestModel").toString
   val bestModel = 
DefaultParamsReader.loadParamsInstance[Model[_]](bestModelPath, sc)
   val avgMetrics = (metadata.metadata \ 
"avgMetrics").extract[Seq[Double]].toArray
+
   val model = new CrossValidatorModel(metadata.uid, bestModel, 
avgMetrics)
   model.set(model.estimator, estimator)
 .set(model.evaluator, evaluator)
 .set(model.estimatorParamMaps, estimatorParamMaps)
-.set(model.numFolds, numFolds)
-.set(model.seed, seed)
+  DefaultParamsReader.getAndSetParams(model, metadata, skipParams = 
List("estimatorParamMaps"))
--- End diff --

Should this also skip `estimator` and `evaluator`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139850950
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -101,14 +101,15 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  /**
-   * For SMJ, child's output must have been sorted on key or expressions 
with the same order as
-   * key, so we can get ordering for key from child's output ordering.
-   */
   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
 : Seq[SortOrder] = {
-keys.zip(childOutputOrdering).map { case (key, childOrder) =>
-  SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
+val requiredOrdering = requiredOrders(keys)
+if (SparkPlan.orderingSatisfies(childOutputOrdering, 
requiredOrdering)) {
--- End diff --

This looks good to me. cc @wzhfy who last touched this code


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139850236
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 }
   }
 
+  def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: 
Int): Any = {
--- End diff --

BTW: since this is only used by one test case, we could put it inside the 
test case method and not make it class level


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139850127
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 }
   }
 
+  def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: 
Int): Any = {
--- End diff --

nit: comparing the counts does not ensure that the sorts are in right 
place. I wish there was an easier way to pass that here but I can't think of any


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139849801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
--- End diff --

SparkPlan is the node for physical operator in SQL so doesn't feel like a 
good place to have this. Since one would want to have all methods related to 
`SortOrder` in a single place, the object class feels better option. We can 
revisit if there are more such methods being added to that object and refac to 
a class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19170
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19170
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81951/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19170: [SPARK-21961][Core] Filter out BlockStatuses Accumulator...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19170
  
**[Test build #81951 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81951/testReport)**
 for PR 19170 at commit 
[`04c1e2a`](https://github.com/apache/spark/commit/04c1e2aa24c61f13f1df5148416bb00f0649fcaf).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18659
  
> what if users installed an older version of pyarrow? Shall we throw 
exception and ask them to upgrade, or work around type casting issue?

@cloud-fan , in regards to handling of problems that might come up if using 
different versions of Arrow, I think we should first decide on a minimum 
supported version, then maybe we could put that version of pyarrow as a 
requirement for PySpark.  If we decide to use 0.4.1 which we currently use, 
then we should probably work around the type casting issue and make sure this 
PR works with that version.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19280
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81949/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19280
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19280: [SPARK-21928][CORE] Set classloader on SerializerManager...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19280
  
**[Test build #81949 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81949/testReport)**
 for PR 19280 at commit 
[`20e3585`](https://github.com/apache/spark/commit/20e3585eac16ef3bfe403ec23f57a2705ff47ecb).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread ArtRand
Github user ArtRand commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139847070
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala
 ---
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.security.PrivilegedExceptionAction
+import java.util.concurrent.{Executors, TimeUnit}
+
+import scala.collection.JavaConverters._
+import scala.util.Try
+
+import org.apache.hadoop.security.UserGroupInformation
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.RpcEndpointRef
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
+import org.apache.spark.util.ThreadUtils
+
+
+class MesosCredentialRenewer(
+conf: SparkConf,
+tokenManager: HadoopDelegationTokenManager,
+nextRenewal: Long,
+de: RpcEndpointRef) extends Logging {
+  private val credentialRenewerThread =
+Executors.newSingleThreadScheduledExecutor(
+  ThreadUtils.namedThreadFactory("Credential Refresh Thread"))
+
+  @volatile private var timeOfNextRenewal = nextRenewal
+
+  private val principal = conf.get("spark.yarn.principal")
+
+  private val (secretFile, mode) = getSecretFile(conf)
+
+  private def getSecretFile(conf: SparkConf): (String, String) = {
+val keytab64 = conf.get("spark.yarn.keytab", null)
+val tgt64 = System.getenv("KRB5CCNAME")
+require(keytab64 != null || tgt64 != null, "keytab or tgt required")
+require(keytab64 == null || tgt64 == null, "keytab and tgt cannot be 
used at the same time")
+val mode = if (keytab64 != null) "keytab" else "tgt"
+val secretFile = if (keytab64 != null) keytab64 else tgt64
+logInfo(s"Logging in as $principal with mode $mode to retrieve HDFS 
delegation tokens")
+logDebug(s"secretFile is $secretFile")
+(secretFile, mode)
+  }
+
+  def scheduleTokenRenewal(): Unit = {
+def scheduleRenewal(runnable: Runnable): Unit = {
+  val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
+  if (remainingTime <= 0) {
+logInfo("Credentials have expired, creating new ones now.")
+runnable.run()
+  } else {
+logInfo(s"Scheduling login from keytab in $remainingTime millis.")
+credentialRenewerThread.schedule(runnable, remainingTime, 
TimeUnit.MILLISECONDS)
+  }
+}
+
+val credentialRenewerRunnable =
+  new Runnable {
+override def run(): Unit = {
+  try {
+val creds = getRenewedDelegationTokens(conf)
+broadcastDelegationTokens(creds)
+  } catch {
+case e: Exception =>
+  // Log the error and try to write new tokens back in an hour
+  logWarning("Couldn't broadcast tokens, trying agin in 20 
seconds", e)
+  credentialRenewerThread.schedule(this, 20, TimeUnit.SECONDS)
+  return
+  }
+  scheduleRenewal(this)
+}
+  }
+scheduleRenewal(credentialRenewerRunnable)
+  }
+
+  private def getRenewedDelegationTokens(conf: SparkConf): Array[Byte] = {
+logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", 
null)}")
+// Get new delegation tokens by logging in with a new UGI
+// inspired by AMCredentialRenewer.scala:L174
+val ugi = if (mode == "keytab") {
+  UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, 
secretFile)
+} else {
+  UserGroupInformation.getUGIFromTicketCache(secretFile, principal)
+}
+val tempCreds = 

[GitHub] spark issue #19259: [BACKPORT-2.1][SPARK-19318][SPARK-22041][SQL] Docker tes...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19259
  
Thanks! Merged to 2.1. 

Could you close this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/17819
  
**[Test build #81956 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81956/testReport)**
 for PR 17819 at commit 
[`92ef9bd`](https://github.com/apache/spark/commit/92ef9bde1e048eef7e3b530286723cad5773debc).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-09-19 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/17819
  
@MLnick I have no strong option but @WeichenXu123 seems more preferring 
merging the new API into current `Bucketizer`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer that can...

2017-09-19 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/17819
  
retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18659
  
Regarding the upgrade of Arrow, the concerns of #18974 are still valid - 
namely it has some risk and upgrading the Python side is a good amount of work 
that only a couple of people have the access to do.  Would it be better to 
discuss the upgrade strategy in another JIRA?



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/19196
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139844429
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
--- End diff --

Actually I had struggled where to put this, in SortOrder or SparkPlan. It 
doesn't look like anywhere else is using Seq[SortOrder] so far, so I chose to 
leave this out of SortOrder. I think, though, if we see potential usage of 
Seq[SortOrder] elsewhere, it might be worth to wrap it as a class. Agree? 
Either way, I could put it into SortOrder for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139844415
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
 ---
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.{RDD, ZippedPartitionsRDD2}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, BindReferences, Expression, LessThanOrEqual, Literal, 
SpecificInternalRow, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.Predicate
+import 
org.apache.spark.sql.execution.streaming.{StatefulOperatorStateInfo, 
StreamingSymmetricHashJoinExec}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper._
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
+import org.apache.spark.util.NextIterator
+
+/**
+ * Helper class to manage state required by a single side of 
[[StreamingSymmetricHashJoinExec]].
+ * The interface of this class is basically that of a multi-map:
+ * - Get: Returns an iterator of multiple values for given key
+ * - Append: Append a new value to the given key
+ * - Remove Data by predicate: Drop any state using a predicate condition 
on keys or values
+ *
+ * @param joinSide  Defines the join side
--- End diff --

nit: why the weird indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/19284
  
Ooops, I reference the wrong JIRA, it was ARROW-1443 PR: 
https://github.com/apache/arrow/pull/1022 ArrowBuf.setBytes was not using the 
destination buffer properly


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139843267
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2142,18 +2159,26 @@ def udf(f=None, returnType=StringType()):
 | 8|  JOHN DOE|  22|
 +--+--++
 """
-def _udf(f, returnType=StringType()):
-udf_obj = UserDefinedFunction(f, returnType)
-return udf_obj._wrapped()
+return _create_udf(f, returnType=returnType, vectorized=False)
 
-# decorator @udf, @udf() or @udf(dataType())
-if f is None or isinstance(f, (str, DataType)):
-# If DataType has been passed as a positional argument
-# for decorator use it as a returnType
-return_type = f or returnType
-return functools.partial(_udf, returnType=return_type)
+
+@since(2.3)
+def pandas_udf(f=None, returnType=StringType()):
+"""
+Creates a :class:`Column` expression representing a user defined 
function (UDF) that accepts
+`Pandas.Series` as input arguments and outputs a `Pandas.Series` of 
the same length.
+
+:param f: python function if used as a standalone function
+:param returnType: a :class:`pyspark.sql.types.DataType` object
+
+# TODO: doctest
+"""
+import inspect
+# If function "f" does not define the optional kwargs, then wrap with 
a kwargs placeholder
+if inspect.getargspec(f).keywords is None:
+return _create_udf(lambda *a, **kwargs: f(*a), 
returnType=returnType, vectorized=True)
--- End diff --

I agree it is still a bit weird.. Did you mean disallowing 0-parameter 
panda_udfs or requiring 0-parameter panda_udfs to accept `kwargs`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18659
  
**[Test build #81955 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81955/testReport)**
 for PR 18659 at commit 
[`f451d65`](https://github.com/apache/spark/commit/f451d652a2656113cce1f0763e17c73ed2d03c44).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDFs

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/18659
  
Thanks for the reviews @ueshin @viirya and @HyukjinKwon !  I updated with 
your comments


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread icexelloss
Github user icexelloss commented on the issue:

https://github.com/apache/spark/pull/19284
  
LGTM. What's the Arrow bug you mentioned? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19282
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81948/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19282
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19282: [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19282
  
**[Test build #81948 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81948/testReport)**
 for PR 19282 at commit 
[`93ff675`](https://github.com/apache/spark/commit/93ff67576566e93eaca3220507049166133ad4b1).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139841884
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
+arrs = [pa.Array.from_pandas(s[0], type=s[1], mask=s[0].isnull()) 
for s in series]
+batch = pa.RecordBatch.from_arrays(arrs, ["_%d" % i for i in 
range(len(arrs))])
+return super(ArrowPandasSerializer, self).dumps(batch)
+
+def loads(self, obj):
+"""
+Deserialize an ArrowRecordBatch to an Arrow table and return as a 
list of pandas.Series
+followed by a dictionary containing length of the loaded batches.
+"""
+import pyarrow as pa
+reader = pa.RecordBatchFileReader(pa.BufferReader(obj))
+batches = [reader.get_batch(i) for i in 
range(reader.num_record_batches)]
+# NOTE: a 0-parameter pandas_udf will produce an empty batch that 
can have num_rows set
+num_rows = sum([batch.num_rows for batch in batches])
--- End diff --

I guess this makes sense because its a summation, no sense in making a list 
then adding it all up


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139841646
  
--- Diff: python/pyspark/serializers.py ---
@@ -199,6 +211,46 @@ def __repr__(self):
 return "ArrowSerializer"
 
 
+class ArrowPandasSerializer(ArrowSerializer):
+"""
+Serializes Pandas.Series as Arrow data.
+"""
+
+def __init__(self):
+super(ArrowPandasSerializer, self).__init__()
+
+def dumps(self, series):
+"""
+Make an ArrowRecordBatch from a Pandas Series and serialize. Input 
is a single series or
+a list of series accompanied by an optional pyarrow type to coerce 
the data to.
+"""
+import pyarrow as pa
+# Make input conform to [(series1, type1), (series2, type2), ...]
+if not isinstance(series, (list, tuple)) or \
+(len(series) == 2 and isinstance(series[1], pa.DataType)):
+series = [series]
+series = [(s, None) if not isinstance(s, (list, tuple)) else s for 
s in series]
--- End diff --

That would work, but does it help much since `series` will already be a 
list or tuple?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139841490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range 

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139841435
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Command.scala
 ---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.v2
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, 
RowEncoder}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
+
+case class WriteToDataSourceV2Command(writer: DataSourceV2Writer, query: 
LogicalPlan)
+  extends RunnableCommand {
--- End diff --

I know similar tasks do the same, but this should not implement 
`RunnableCommand`. I'm not sure the original intent for it, but I think 
`RunnableCommand` should be used for small tasks that are carried out on the 
driver, like DDL. 

Using `RunnableCommand` in cases like this where a job needs to run ends up 
effectively linking a logical plan into a physical plan, which has caused a few 
messy issues. For example, the problem where the Spark SQL tab doesn't show the 
entire operation and only shows the outer command without metrics.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19272: [Spark-21842][Mesos] Support Kerberos ticket rene...

2017-09-19 Thread skonto
Github user skonto commented on a diff in the pull request:

https://github.com/apache/spark/pull/19272#discussion_r139841458
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -159,6 +159,13 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 scheduler.getExecutorsAliveOnHost(host).foreach { exec =>
   killExecutors(exec.toSeq, replace = true, force = true)
 }
+
+  case UpdateDelegationTokens(tokens) =>
+logDebug("Asking each executor to update HDFS delegation tokens")
+for ((x, executorData) <- executorDataMap) {
--- End diff --

Alternatively executorDataMap.values.foreach(...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139841304
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -114,6 +115,16 @@ class IncrementalExecution(
   stateInfo = Some(nextStatefulOperationStateInfo),
   batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
   eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
+
+  case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, 
_, left, right) =>
+j.copy(
+  stateInfo = Some(nextStatefulOperationStateInfo),
--- End diff --

That is the case **if** you don't change Spark versions. A more recent 
Spark version may include new optimizer rules that may change the ordering.
Just something to think about. Would be nice to add a test with aggregation 
+ join and join + aggregation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81954 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81954/testReport)**
 for PR 19196 at commit 
[`4eb7f4f`](https://github.com/apache/spark/commit/4eb7f4f6df3f2d5ae831bf15715651598e52c3e6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19196: [SPARK-21977] SinglePartition optimizations break...

2017-09-19 Thread brkyvz
Github user brkyvz commented on a diff in the pull request:

https://github.com/apache/spark/pull/19196#discussion_r139840990
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/streaming/EnsureStatefulOpPartitioningSuite.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{SparkPlan, SparkPlanTest, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchange}
+import org.apache.spark.sql.execution.streaming.{IncrementalExecution, 
OffsetSeqMetadata, StatefulOperator, StatefulOperatorStateInfo}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class EnsureStatefulOpPartitioningSuite extends SparkPlanTest with 
SharedSQLContext {
+
+  import testImplicits._
+  super.beforeAll()
+
+  private val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char")
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution generates Exchange with HashPartitioning",
+baseDf.queryExecution.sparkPlan,
+requiredDistribution = keys => ClusteredDistribution(keys),
+expectedPartitioning =
+  keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"ClusteredDistribution with coalesce(1) generates Exchange with 
HashPartitioning",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+requiredDistribution = keys => ClusteredDistribution(keys),
+expectedPartitioning =
+  keys => HashPartitioning(keys, 
spark.sessionState.conf.numShufflePartitions),
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples generates Exchange with SinglePartition",
+baseDf.queryExecution.sparkPlan,
+requiredDistribution = _ => AllTuples,
+expectedPartitioning = _ => SinglePartition,
+expectShuffle = true)
+
+  testEnsureStatefulOpPartitioning(
+"AllTuples with coalesce(1) doesn't need Exchange",
+baseDf.coalesce(1).queryExecution.sparkPlan,
+requiredDistribution = _ => AllTuples,
+expectedPartitioning = _ => SinglePartition,
+expectShuffle = false)
+
+  /**
+   * For `StatefulOperator` with the given `requiredChildDistribution`, 
and child SparkPlan
+   * `inputPlan`, ensures that the incremental planner adds exchanges, if 
required, in order to
+   * ensure the expected partitioning.
+   */
+  private def testEnsureStatefulOpPartitioning(
+  testName: String,
+  inputPlan: SparkPlan,
+  requiredDistribution: Seq[Attribute] => Distribution,
+  expectedPartitioning: Seq[Attribute] => Partitioning,
+  expectShuffle: Boolean): Unit = {
+test(testName) {
+  val operator = TestStatefulOperator(inputPlan, 
requiredDistribution(inputPlan.output.take(1)))
+  val executed = executePlan(operator, OutputMode.Complete())
+  if (expectShuffle) {
+val exchange = executed.children.find(_.isInstanceOf[Exchange])
+if (exchange.isEmpty) {
+  fail(s"Was expecting an exchange but didn't get one 
in:\n$executed")
+}
+assert(exchange.get ===
+  ShuffleExchange(expectedPartitioning(inputPlan.output.take(1)), 
inputPlan),
+  s"Exchange didn't have expected properties:\n${exchange.get}")
+  } else {
+assert(!executed.children.exists(_.isInstanceOf[Exchange]),
+  s"Unexpected exchange found in:\n$executed")
+  }
+}
+  }
+
+  /** Executes a SparkPlan using the IncrementalPlanner used for 
Structured 

[GitHub] spark pull request #18659: [SPARK-21190][PYSPARK][WIP] Python Vectorized UDF...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18659#discussion_r139840306
  
--- Diff: python/pyspark/worker.py ---
@@ -71,7 +73,19 @@ def wrap_udf(f, return_type):
 return lambda *a: f(*a)
 
 
-def read_single_udf(pickleSer, infile):
+def wrap_pandas_udf(f, return_type):
+def verify_result_length(*a):
+kwargs = a[-1]
+result = f(*a[:-1], **kwargs)
+if len(result) != kwargs["length"]:
+raise RuntimeError("Result vector from pandas_udf was not the 
required length: "
+   "expected %d, got %d\nUse input vector 
length or kwarg['length']"
+   % (kwargs["length"], len(result)))
+return result, toArrowType(return_type)
--- End diff --

sure, that sounds good thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19270: [SPARK-21809] : Change Stage Page to use datatables to s...

2017-09-19 Thread ajbozarth
Github user ajbozarth commented on the issue:

https://github.com/apache/spark/pull/19270
  
On a second look I think I figured out my misunderstanding, and I've 
realized a through review will take quite a bit of time, I'll do my best to 
finish by the end of the week but no promises. As for the MiMa failure, any 
change to a public api (even additions) must be added to the MiMa excludes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19196
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81950/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19196: [SPARK-21977] SinglePartition optimizations break certai...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19196
  
**[Test build #81950 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81950/testReport)**
 for PR 19196 at commit 
[`8a6eafe`](https://github.com/apache/spark/commit/8a6eafef056b2a64ee0be07ce886ad69dc295537).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139839037
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java 
---
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+
+/**
+ * A data writer returned by {@link WriteTask#createWriter(int, int, int)} 
and is responsible for
+ * writing data for an input RDD partition.
+ *
+ * Note that, Currently the type `T` can only be {@link 
org.apache.spark.sql.Row} for normal data
+ * source writers, or {@link 
org.apache.spark.sql.catalyst.expressions.UnsafeRow} for data source
+ * writers that mix in {@link SupportsWriteUnsafeRow}.
+ */
+@InterfaceStability.Evolving
+public interface DataWriter {
+
+  void write(T record);
--- End diff --

What happens if this throws an exception?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139838908
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers for this writing job and are produced by {@link 
DataWriter#commit()}. This
+   * also means all the data are written successfully and all data writers 
are committed.
+   */
+  void commit(WriterCommitMessage[] messages);
+
+  /**
+   * Aborts this writing job because some data writers are failed to write 
the records and aborted.
+   */
+  void abort();
--- End diff --

Should this accept the commit messages for committed tasks, or will tasks 
be aborted?

I'm thinking of the case where you're writing to S3. Say a data source 
writes all attempt files to the final locations, then removes any attempts that 
are aborted. If the job aborts with some tasks that have already committed, 
then either this should have the option of cleaning up those files (passed in 
the commit message) or all of the tasks should be individually aborted. I'd 
prefer to have this abort clean up successful/committed tasks because the logic 
may be different.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139838459
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
+
+  /**
+   * Commits this writing job with a list of commit messages. The commit 
messages are collected from
+   * all data writers for this writing job and are produced by {@link 
DataWriter#commit()}. This
+   * also means all the data are written successfully and all data writers 
are committed.
--- End diff --

I think this should state the guarantees when this method is called:

* One and only one attempt for every task has committed successfully
* Messages contains the commit message from every committed task attempt, 
which is no more than one per task.
* All other attempts have been successfully aborted (is this a guarantee, 
or just that aborts have been attemtped?)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19141
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/81953/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...

2017-09-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19141
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19141
  
**[Test build #81953 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81953/testReport)**
 for PR 19141 at commit 
[`d2d13fe`](https://github.com/apache/spark/commit/d2d13fe82aec1c0fc43d688dbd315385ef99be19).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139836068
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
--- End diff --

How does this handle speculative execution? This description makes it sound 
like attempts are only run serially. I'd like to have an interface that signals 
support for concurrent tasks, for data sources that act like the direct 
committer and can't handle speculation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139835816
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139835603
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
+ *
+ * Note that, data sources are responsible for providing transaction 
ability by implementing the
+ * `commit` and `abort` methods of {@link DataSourceV2Writer} and {@link 
DataWriter} correctly.
+ * The transaction here is Spark-level transaction, which may not be the 
underlying storage
+ * transaction. For example, Spark successfully write data to a Cassandra 
data source, but
+ * Cassandra may need some more time to reach consistency at storage level.
+ */
+@InterfaceStability.Evolving
+public interface DataSourceV2Writer {
+
+  /**
+   * Creates a write task which will be serialized and sent to executors. 
For each partition of the
+   * input data(RDD), there will be one write task to write the records.
+   */
+  WriteTask createWriteTask();
--- End diff --

I think it's confusing to have only one write "task" that is serialized and 
used everywhere. It is implicitly copied by the serialization into multiple 
distinct tasks. Is there a better name for it? Maybe call the `DataWriter` the 
`WriteTask` and serialize something with a better name?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139835427
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark pull request #18754: [WIP][SPARK-21552][SQL] Add DecimalType support t...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on a diff in the pull request:

https://github.com/apache/spark/pull/18754#discussion_r139835312
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala 
---
@@ -224,6 +226,25 @@ private[arrow] class DoubleWriter(val valueVector: 
NullableFloat8Vector) extends
   }
 }
 
+private[arrow] class DecimalWriter(
+val valueVector: NullableDecimalVector,
+precision: Int,
+scale: Int) extends ArrowFieldWriter {
+
+  override def valueMutator: NullableDecimalVector#Mutator = 
valueVector.getMutator()
+
+  override def setNull(): Unit = {
+valueMutator.setNull(count)
+  }
+
+  override def setValue(input: SpecializedGetters, ordinal: Int): Unit = {
+valueMutator.setIndexDefined(count)
+val decimal = input.getDecimal(ordinal, precision, scale)
+decimal.changePrecision(precision, scale)
+DecimalUtility.writeBigDecimalToArrowBuf(decimal.toJavaBigDecimal, 
valueVector.getBuffer, count)
--- End diff --

It was an issue with StringWriter, I put the fix in #19284 please take a 
look, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19141
  
**[Test build #81953 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81953/testReport)**
 for PR 19141 at commit 
[`d2d13fe`](https://github.com/apache/spark/commit/d2d13fe82aec1c0fc43d688dbd315385ef99be19).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139835014
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread BryanCutler
Github user BryanCutler commented on the issue:

https://github.com/apache/spark/pull/19284
  
@ueshin and @icexellos this came up while testing with Arrow 0.7.0.  It 
seems that when Spark gets row data as a UTF8String ByteBuffer, the data can 
start at an offset which becomes the ByteBuffer position when this line is 
called 
https://github.com/apache/spark/blob/master/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java#L171


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139834609
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark issue #19141: [SPARK-21384] [YARN] Spark + YARN fails with LocalFileSy...

2017-09-19 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/19141
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139834571
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/ReadSupport.java ---
@@ -30,9 +30,8 @@
   /**
* Creates a {@link DataSourceV2Reader} to scan the data from this data 
source.
*
-   * @param options the options for this data source reader, which is an 
immutable case-insensitive
-   *string-to-string map.
-   * @return a reader that implements the actual read logic.
+   * @param options the options for the returned data source reader, which 
is an immutable
+   *case-insensitive string-to-string map.
--- End diff --

It would make this much easier to review if changes to the read path were 
taken out and committed in a follow-up to #19136.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139834356
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark issue #19284: [SPARK-22067][SQL] ArrowWriter should use position when ...

2017-09-19 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19284
  
**[Test build #81952 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/81952/testReport)**
 for PR 19284 at commit 
[`5ac572d`](https://github.com/apache/spark/commit/5ac572d3cb2422f57e101fa2cbc761f4b748daa6).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19284: [SPARK-22067][SQL] ArrowWriter should use positio...

2017-09-19 Thread BryanCutler
GitHub user BryanCutler opened a pull request:

https://github.com/apache/spark/pull/19284

[SPARK-22067][SQL] ArrowWriter should use position when setting UTF8String 
ByteBuffer

## What changes were proposed in this pull request?

The ArrowWriter StringWriter was setting Arrow data using a position of 0 
instead of the actual position in the ByteBuffer.  This was currently working 
because of a bug ARROW-1447, and has been fixed as of 
Arrow 0.7.0.  Testing with this version revealed the error in 
ArrowConvertersSuite test string conversion.

## How was this patch tested?

Existing tests, manually verified working with Arrow 0.7.0



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/BryanCutler/spark 
arrow-ArrowWriter-StringWriter-position-SPARK-22067

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19284.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19284


commit 5ac572d3cb2422f57e101fa2cbc761f4b748daa6
Author: Bryan Cutler 
Date:   2017-09-19T22:17:23Z

pass in position of ByteBuffer when using StringWriter




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139833753
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139833518
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
+val keyExprWithWatermark = BoundReference(
+  joinKeyOrdinalForWatermark.get,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).dataType,
+  oneSideJoinKeys(joinKeyOrdinalForWatermark.get).nullable)
+val expr = watermarkExpression(Some(keyExprWithWatermark), 
eventTimeWatermark)
+expr.map(JoinStateKeyWatermarkPredicate)
+
+  } else if (isWatermarkDefinedOnInput) { // case 2 explained in the 
class docs
+val stateValueWatermark = getStateValueWatermark(
+  attributesToFindStateWatemarkFor = oneSideInputAttributes,
+  attributesWithEventWatermark = otherSideInputAttributes,
+  condition,
+  eventTimeWatermark)
+val inputAttributeWithWatermark = 
oneSideInputAttributes.find(_.metadata.contains(delayKey))
+val expr = watermarkExpression(inputAttributeWithWatermark, 
stateValueWatermark)
+

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139833470
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
+  leftKeys.zipWithIndex.collectFirst {
+case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+  } orElse {
+rightKeys.zipWithIndex.collectFirst {
+  case (ne: NamedExpression, index) if 
ne.metadata.contains(delayKey) => index
+}
+  }
+}
+
+def getOneSideStateWatermarkPredicate(
+oneSideInputAttributes: Seq[Attribute],
+oneSideJoinKeys: Seq[Expression],
+otherSideInputAttributes: Seq[Attribute]): 
Option[JoinStateWatermarkPredicate] = {
+  val isWatermarkDefinedOnInput = 
oneSideInputAttributes.exists(_.metadata.contains(delayKey))
+  val isWatermarkDefinedOnJoinKey = 
joinKeyOrdinalForWatermark.isDefined
+
+  if (isWatermarkDefinedOnJoinKey) { // case 1 and 3 explained in the 
class docs
--- End diff --

done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139833312
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExecHelper.scala
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Add, Attribute, 
AttributeReference, BoundReference, Cast, CheckOverflow, Expression, 
ExpressionSet, GreaterThan, GreaterThanOrEqual, LessThan, LessThanOrEqual, 
Literal, Multiply, NamedExpression, PredicateHelper, Subtract, TimeAdd, 
TimeSub, UnaryMinus}
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import 
org.apache.spark.sql.execution.streaming.WatermarkSupport.watermarkExpression
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.CalendarInterval
+
+
+/**
+ * Helper object for [[StreamingSymmetricHashJoinExec]].
+ */
+object StreamingSymmetricHashJoinExecHelper extends PredicateHelper with 
Logging {
+
+  sealed trait JoinSide
+  case object LeftSide extends JoinSide { override def toString(): String 
= "left" }
+  case object RightSide extends JoinSide { override def toString(): String 
= "right" }
+
+  sealed trait JoinStateWatermarkPredicate
+  case class JoinStateKeyWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+  case class JoinStateValueWatermarkPredicate(expr: Expression) extends 
JoinStateWatermarkPredicate
+
+  case class JoinStateWatermarkPredicates(
+left: Option[JoinStateWatermarkPredicate] = None,
+right: Option[JoinStateWatermarkPredicate] = None)
+
+  def getStateWatermarkPredicates(
+  leftAttributes: Seq[Attribute],
+  rightAttributes: Seq[Attribute],
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  condition: Option[Expression],
+  eventTimeWatermark: Option[Long]): JoinStateWatermarkPredicates = {
+val joinKeyOrdinalForWatermark: Option[Int] = {
--- End diff --

removed the other one. not needed. copied the docs to this location.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19269: [SPARK-22026][SQL][WIP] data source v2 write path

2017-09-19 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19269#discussion_r139832973
  
--- Diff: 
sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2.writer;
+
+import org.apache.spark.annotation.InterfaceStability;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.sources.v2.DataSourceV2Options;
+import org.apache.spark.sql.sources.v2.WriteSupport;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A data source writer that is returned by
+ * {@link WriteSupport#createWriter(StructType, SaveMode, 
DataSourceV2Options)}.
+ * It can mix in various writing optimization interfaces to speed up the 
data saving. The actual
+ * writing logic is delegated to {@link WriteTask} that is returned by 
{@link #createWriteTask()}.
+ *
+ * The writing procedure is:
+ *   1. Create a write task by {@link #createWriteTask()}, serialize and 
send it to all the
+ *  partitions of the input data(RDD).
+ *   2. For each partition, create a data writer with the write task, and 
write the data of the
+ *  partition with this writer. If all the data are written 
successfully, call
+ *  {@link DataWriter#commit()}. If exception happens during the 
writing, call
+ *  {@link DataWriter#abort()}. This step may repeat several times as 
Spark will retry failed
+ *  tasks.
+ *   3. Wait until all the writers/partitions are finished, i.e., either 
committed or aborted. If
+ *  all partitions are written successfully, call {@link 
#commit(WriterCommitMessage[])}. If
+ *  some partitions failed and aborted, call {@link #abort()}.
--- End diff --

The main reason why I wanted a separate SPIP for the write path was this 
point in the doc:

> Ideally partitioning/bucketing concept should not be exposed in the Data 
Source API V2, because they are just techniques for data skipping and 
pre-partitioning. However, these 2 concepts are already widely used in Spark, 
e.g. DataFrameWriter.partitionBy and DDL syntax like ADD PARTITION. To be 
consistent, we need to add partitioning/bucketing to Data Source V2, so that 
the implementations can be able to specify partitioning/bucketing for 
read/write.

There's a lot in there that's worth thinking about and possibly changing:

1. Ideally, the DataSourceV2 API wouldn't support bucketing/partitioning
2. The current DataFrameWriter API is what we should continue to support
3. Implementations should supply bucketing and partitioning for writes 
because of 2

**Bucketing/partitioning**: It comes down to the level at which this API is 
going to be used. It looks like this API currently ignores bucketing and 
partitioning (unless my read through was too quick). I think I agree that in 
the long term that's a good thing, but we need ways for a data source to tell 
Spark about its requirements for incoming data.

In the current version, it looks like Spark would know how to prepare data 
for writers outside of this API (rather than including support as suggested by 
point 3). When writing a partitioned table, Spark would get the partitioning 
from the table definition in the metastore and automatically sort by partition 
columns. Is that right?

I'd like to move the data store's requirements behind this API. For 
example, writing to HBase files directly requires sorting by key first. We 
don't want to do the sort in the writer because it may duplicate work (and 
isn't captured in the physical plan), and we also don't want to require Spark 
to know about the requirements of the HBase data store, or any other specific 
implementation.

**DataFrameWriter API**: I'd like to talk about separating the API for 
table definitions and writes, but not necessarily as part of this work. The 
SPIP should clearly state whether that's part of 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832926
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832660
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832320
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832253
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832098
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139832036
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831879
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831615
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831226
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831209
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139831092
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830996
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830591
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
--- End diff --

done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
 ---
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
JoinedRow, NamedExpression, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark._
+import org.apache.spark.sql.catalyst.plans.physical._
+import org.apache.spark.sql.execution.{BinaryExecNode, SparkPlan}
+import 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExecHelper._
+import org.apache.spark.sql.execution.streaming.state._
+import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.util.{CompletionIterator, 
SerializableConfiguration}
+
+
+/**
+ * Performs stream-stream join using symmetric hash join algorithm. It 
works as follows.
+ *
+ * /---\
+ *   left side input ->|left side state|--\
+ * \---/  |
+ *|> 
joined output
+ * /---\  |
+ *   right side input >|right side state   |--/
+ * \---/
+ *
+ * Each join side buffers past input rows as streaming state so that the 
past input can be joined
+ * with future input on the other side. This buffer state is effectively a 
multi-map:
+ *equi-join key -> list of past input rows received with the join key
+ *
+ * For each input row in each side, the following operations take place.
+ * - Calculate join key from the row.
+ * - Use the join key to append the row to the buffer state of the side 
that the row came from.
+ * - Find past buffered values for the key from the other side. For each 
such value, emit the
+ *   "joined row" (left-row, right-row)
+ * - Apply the optional condition to filter the joined rows as the final 
output.
+ *
+ * If a timestamp column with event time watermark is present in the join 
keys or in the input
+ * data, then the it uses the watermark figure out which rows in the 
buffer will not join with
+ * and new data, and therefore can be discarded. Depending on the provided 
query conditions, we
+ * can define thresholds on both state key (i.e. joining keys) and state 
value (i.e. input rows).
+ * There are three kinds of queries possible regarding this as explained 
below.
+ * Assume that watermark has been defined on both `leftTime` and 
`rightTime` columns used below.
+ *
+ * 1. When timestamp/time-window + watermark is in the join keys. Example 
(pseudo-SQL):
+ *
+ *  SELECT * FROM leftTable, rightTable
+ *  ON
+ *leftKey = rightKey AND
+ *window(leftTime, "1 hour") = window(rightTime, "1 hour")// 
1hr tumbling windows
+ *
+ *In this case, this operator will join rows newer than watermark 
which fall in the same
+ *1 hour window. Say the event-time watermark is "12:34" (both left 
and right input).
+ *Then input rows can only have time > 12:34. Hence, they can only 
join with buffered rows
+ *where window >= 12:00 - 1:00 and all buffered rows with join window 
< 12:00 can be
+ *discarded. In other words, the operator will discard all state where
+ *window in state key (i.e. join key) < event time watermark. This 
threshold is called
+ *State Key Watermark.
+ *
+ * 2. When timestamp range conditions 

[GitHub] spark pull request #19271: [SPARK-22053][SS] Stream-stream inner join in App...

2017-09-19 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/19271#discussion_r139830482
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
 ---
@@ -114,6 +115,16 @@ class IncrementalExecution(
   stateInfo = Some(nextStatefulOperationStateInfo),
   batchTimestampMs = Some(offsetSeqMetadata.batchTimestampMs),
   eventTimeWatermark = Some(offsetSeqMetadata.batchWatermarkMs))
+
+  case j @ StreamingSymmetricHashJoinExec(lKeys, rKeys, _, cond, _, _, 
_, left, right) =>
+j.copy(
+  stateInfo = Some(nextStatefulOperationStateInfo),
--- End diff --

Whatever optimization takes place, the same optimizations will occur in 
EVERY batch. So if aggregation is pushed below join, then all the batches will 
have that. 

What we have to guard against is cost-based optimization that can reorder 
things differently in different batches. That is, why I have disabled 
cost-based join optimization. And when adding such optimizations in the future, 
we have to be cautious for the streaming case and disable them.

Also, this is a general concern with other stateful ops as well, not 
something that this PR would address.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >