spark git commit: [SPARK-20217][CORE] Executor should not fail stage if killed task throws non-interrupted exception

2017-04-05 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 4000f128b -> 5142e5d4e


[SPARK-20217][CORE] Executor should not fail stage if killed task throws 
non-interrupted exception

## What changes were proposed in this pull request?

If tasks throw non-interrupted exceptions on kill (e.g. 
java.nio.channels.ClosedByInterruptException), their death is reported back as 
TaskFailed instead of TaskKilled. This causes stage failure in some cases.

This is reproducible as follows. Run the following, and then use 
SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will 
fail since we threw a RuntimeException instead of InterruptedException.

```
spark.range(100).repartition(100).foreach { i =>
  try {
Thread.sleep(1000)
  } catch {
case t: InterruptedException =>
  throw new RuntimeException(t)
  }
}
```
Based on the code in TaskSetManager, I think this also affects kills of 
speculative tasks. However, since the number of speculated tasks is few, and 
usually you need to fail a task a few times before the stage is cancelled, it 
unlikely this would be noticed in production unless both speculation was 
enabled and the num allowed task failures was = 1.

We should probably unconditionally return TaskKilled instead of TaskFailed if 
the task was killed by the driver, regardless of the actual exception thrown.

## How was this patch tested?

Unit test. The test fails before the change in Executor.scala

cc JoshRosen

Author: Eric Liang 

Closes #17531 from ericl/fix-task-interrupt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5142e5d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5142e5d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5142e5d4

Branch: refs/heads/master
Commit: 5142e5d4e09c7cb36cf1d792934a21c5305c6d42
Parents: 4000f12
Author: Eric Liang 
Authored: Wed Apr 5 19:37:21 2017 -0700
Committer: Yin Huai 
Committed: Wed Apr 5 19:37:21 2017 -0700

--
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 +-
 core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 8 +++-
 2 files changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5142e5d4/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 99b1608..83469c5 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -432,7 +432,7 @@ private[spark] class Executor(
   setTaskFinishedAndClearInterruptStatus()
   execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(t.reason)))
 
-case _: InterruptedException if task.reasonIfKilled.isDefined =>
+case NonFatal(_) if task != null && task.reasonIfKilled.isDefined =>
   val killReason = task.reasonIfKilled.getOrElse("unknown reason")
   logInfo(s"Executor interrupted and killed $taskName (TID $taskId), 
reason: $killReason")
   setTaskFinishedAndClearInterruptStatus()

http://git-wip-us.apache.org/repos/asf/spark/blob/5142e5d4/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 2c94755..735f445 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -572,7 +572,13 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 // first attempt will hang
 if (!SparkContextSuite.isTaskStarted) {
   SparkContextSuite.isTaskStarted = true
-  Thread.sleep(999)
+  try {
+Thread.sleep(999)
+  } catch {
+case t: Throwable =>
+  // SPARK-20217 should not fail stage if task throws 
non-interrupted exception
+  throw new RuntimeException("killed")
+  }
 }
 // second attempt succeeds immediately
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20231][SQL] Refactor star schema code for the subsequent star join detection in CBO

2017-04-05 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 12206058e -> 4000f128b


[SPARK-20231][SQL] Refactor star schema code for the subsequent star join 
detection in CBO

## What changes were proposed in this pull request?

This commit moves star schema code from ```join.scala``` to 
```StarSchemaDetection.scala```. It also applies some minor fixes in 
```StarJoinReorderSuite.scala```.

## How was this patch tested?
Run existing ```StarJoinReorderSuite.scala```.

Author: Ioana Delaney 

Closes #17544 from ioana-delaney/starSchemaCBOv2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4000f128
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4000f128
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4000f128

Branch: refs/heads/master
Commit: 4000f128b7101484ba618115504ca916c22fa84a
Parents: 1220605
Author: Ioana Delaney 
Authored: Wed Apr 5 18:02:53 2017 -0700
Committer: Xiao Li 
Committed: Wed Apr 5 18:02:53 2017 -0700

--
 .../optimizer/StarSchemaDetection.scala | 351 +++
 .../spark/sql/catalyst/optimizer/joins.scala| 328 +
 .../optimizer/StarJoinReorderSuite.scala|   4 +-
 3 files changed, 354 insertions(+), 329 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4000f128/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
new file mode 100644
index 000..91cb004
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import scala.annotation.tailrec
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Encapsulates star-schema detection logic.
+ */
+case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper {
+
+  /**
+   * Star schema consists of one or more fact tables referencing a number of 
dimension
+   * tables. In general, star-schema joins are detected using the following 
conditions:
+   *  1. Informational RI constraints (reliable detection)
+   * + Dimension contains a primary key that is being joined to the fact table.
+   * + Fact table contains foreign keys referencing multiple dimension tables.
+   * 2. Cardinality based heuristics
+   * + Usually, the table with the highest cardinality is the fact table.
+   * + Table being joined with the most number of tables is the fact table.
+   *
+   * To detect star joins, the algorithm uses a combination of the above two 
conditions.
+   * The fact table is chosen based on the cardinality heuristics, and the 
dimension
+   * tables are chosen based on the RI constraints. A star join will consist 
of the largest
+   * fact table joined with the dimension tables on their primary keys. To 
detect that a
+   * column is a primary key, the algorithm uses table and column statistics.
+   *
+   * The algorithm currently returns only the star join with the largest fact 
table.
+   * Choosing the largest fact table on the driving arm to avoid large inners 
is in
+   * general a good heuristic. This restriction will be lifted to observe 
multiple
+   * star joins.
+   *
+   * The highlights of the algorithm are the following:
+   *
+   * Given a set of joined tables/plans, the algorithm first verifies if they 
are eligible
+   * for star join detection. An eligible plan is a base table access with 
valid statistics.
+   * A 

spark git commit: [SPARK-20214][ML] Make sure converted csc matrix has sorted indices

2017-04-05 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 15ea5eaa2 -> 9016e17af


[SPARK-20214][ML] Make sure converted csc matrix has sorted indices

## What changes were proposed in this pull request?

`_convert_to_vector` converts a scipy sparse matrix to csc matrix for 
initializing `SparseVector`. However, it doesn't guarantee the converted csc 
matrix has sorted indices and so a failure happens when you do something like 
that:

from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
_convert_to_vector(lil.todok())

File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", 
line 78, in _convert_to_vector
  return SparseVector(l.shape[0], csc.indices, csc.data)
File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", 
line 556, in __init__
  % (self.indices[i], self.indices[i + 1]))
TypeError: Indices 3 and 1 are not strictly increasing

A simple test can confirm that `dok_matrix.tocsc()` won't guarantee sorted 
indices:

>>> from scipy.sparse import lil_matrix
>>> lil = lil_matrix((4, 1))
>>> lil[1, 0] = 1
>>> lil[3, 0] = 2
>>> dok = lil.todok()
>>> csc = dok.tocsc()
>>> csc.has_sorted_indices
0
>>> csc.indices
array([3, 1], dtype=int32)

I checked the source codes of scipy. The only way to guarantee it is 
`csc_matrix.tocsr()` and `csr_matrix.tocsc()`.

## How was this patch tested?

Existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #17532 from viirya/make-sure-sorted-indices.

(cherry picked from commit 12206058e8780e202c208b92774df3773eff36ae)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9016e17a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9016e17a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9016e17a

Branch: refs/heads/branch-2.0
Commit: 9016e17af6048e63841938244d6207fa64b010b1
Parents: 15ea5ea
Author: Liang-Chi Hsieh 
Authored: Wed Apr 5 17:46:44 2017 -0700
Committer: Joseph K. Bradley 
Committed: Wed Apr 5 17:47:59 2017 -0700

--
 python/pyspark/ml/linalg/__init__.py|  3 +++
 python/pyspark/mllib/linalg/__init__.py |  3 +++
 python/pyspark/mllib/tests.py   | 11 +++
 3 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9016e17a/python/pyspark/ml/linalg/__init__.py
--
diff --git a/python/pyspark/ml/linalg/__init__.py 
b/python/pyspark/ml/linalg/__init__.py
index bd0e186..22001e8 100644
--- a/python/pyspark/ml/linalg/__init__.py
+++ b/python/pyspark/ml/linalg/__init__.py
@@ -72,7 +72,10 @@ def _convert_to_vector(l):
 return DenseVector(l)
 elif _have_scipy and scipy.sparse.issparse(l):
 assert l.shape[1] == 1, "Expected column vector"
+# Make sure the converted csc_matrix has sorted indices.
 csc = l.tocsc()
+if not csc.has_sorted_indices:
+csc.sort_indices()
 return SparseVector(l.shape[0], csc.indices, csc.data)
 else:
 raise TypeError("Cannot convert type %s into Vector" % type(l))

http://git-wip-us.apache.org/repos/asf/spark/blob/9016e17a/python/pyspark/mllib/linalg/__init__.py
--
diff --git a/python/pyspark/mllib/linalg/__init__.py 
b/python/pyspark/mllib/linalg/__init__.py
index d37e715..12f87fd 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -74,7 +74,10 @@ def _convert_to_vector(l):
 return DenseVector(l)
 elif _have_scipy and scipy.sparse.issparse(l):
 assert l.shape[1] == 1, "Expected column vector"
+# Make sure the converted csc_matrix has sorted indices.
 csc = l.tocsc()
+if not csc.has_sorted_indices:
+csc.sort_indices()
 return SparseVector(l.shape[0], csc.indices, csc.data)
 else:
 raise TypeError("Cannot convert type %s into Vector" % type(l))

http://git-wip-us.apache.org/repos/asf/spark/blob/9016e17a/python/pyspark/mllib/tests.py
--
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index c519883..523b3f1 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -853,6 +853,17 @@ class SciPyTests(MLlibTestCase):
 self.assertEqual(sv, serialize(lil.tocsr()))
 self.assertEqual(sv, serialize(lil.todok()))
 
+def test_convert_to_vector(self):
+from scipy.sparse import 

spark git commit: [SPARK-20214][ML] Make sure converted csc matrix has sorted indices

2017-04-05 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2b85e059b -> fb81a412e


[SPARK-20214][ML] Make sure converted csc matrix has sorted indices

## What changes were proposed in this pull request?

`_convert_to_vector` converts a scipy sparse matrix to csc matrix for 
initializing `SparseVector`. However, it doesn't guarantee the converted csc 
matrix has sorted indices and so a failure happens when you do something like 
that:

from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
_convert_to_vector(lil.todok())

File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", 
line 78, in _convert_to_vector
  return SparseVector(l.shape[0], csc.indices, csc.data)
File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", 
line 556, in __init__
  % (self.indices[i], self.indices[i + 1]))
TypeError: Indices 3 and 1 are not strictly increasing

A simple test can confirm that `dok_matrix.tocsc()` won't guarantee sorted 
indices:

>>> from scipy.sparse import lil_matrix
>>> lil = lil_matrix((4, 1))
>>> lil[1, 0] = 1
>>> lil[3, 0] = 2
>>> dok = lil.todok()
>>> csc = dok.tocsc()
>>> csc.has_sorted_indices
0
>>> csc.indices
array([3, 1], dtype=int32)

I checked the source codes of scipy. The only way to guarantee it is 
`csc_matrix.tocsr()` and `csr_matrix.tocsc()`.

## How was this patch tested?

Existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #17532 from viirya/make-sure-sorted-indices.

(cherry picked from commit 12206058e8780e202c208b92774df3773eff36ae)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb81a412
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb81a412
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb81a412

Branch: refs/heads/branch-2.1
Commit: fb81a412eea1e60bd503cb5bb879ae468be24e56
Parents: 2b85e05
Author: Liang-Chi Hsieh 
Authored: Wed Apr 5 17:46:44 2017 -0700
Committer: Joseph K. Bradley 
Committed: Wed Apr 5 17:46:55 2017 -0700

--
 python/pyspark/ml/linalg/__init__.py|  3 +++
 python/pyspark/mllib/linalg/__init__.py |  3 +++
 python/pyspark/mllib/tests.py   | 11 +++
 3 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb81a412/python/pyspark/ml/linalg/__init__.py
--
diff --git a/python/pyspark/ml/linalg/__init__.py 
b/python/pyspark/ml/linalg/__init__.py
index 1705c15..eed9946 100644
--- a/python/pyspark/ml/linalg/__init__.py
+++ b/python/pyspark/ml/linalg/__init__.py
@@ -72,7 +72,10 @@ def _convert_to_vector(l):
 return DenseVector(l)
 elif _have_scipy and scipy.sparse.issparse(l):
 assert l.shape[1] == 1, "Expected column vector"
+# Make sure the converted csc_matrix has sorted indices.
 csc = l.tocsc()
+if not csc.has_sorted_indices:
+csc.sort_indices()
 return SparseVector(l.shape[0], csc.indices, csc.data)
 else:
 raise TypeError("Cannot convert type %s into Vector" % type(l))

http://git-wip-us.apache.org/repos/asf/spark/blob/fb81a412/python/pyspark/mllib/linalg/__init__.py
--
diff --git a/python/pyspark/mllib/linalg/__init__.py 
b/python/pyspark/mllib/linalg/__init__.py
index 031f22c..7b24b3c 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -74,7 +74,10 @@ def _convert_to_vector(l):
 return DenseVector(l)
 elif _have_scipy and scipy.sparse.issparse(l):
 assert l.shape[1] == 1, "Expected column vector"
+# Make sure the converted csc_matrix has sorted indices.
 csc = l.tocsc()
+if not csc.has_sorted_indices:
+csc.sort_indices()
 return SparseVector(l.shape[0], csc.indices, csc.data)
 else:
 raise TypeError("Cannot convert type %s into Vector" % type(l))

http://git-wip-us.apache.org/repos/asf/spark/blob/fb81a412/python/pyspark/mllib/tests.py
--
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index c519883..523b3f1 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -853,6 +853,17 @@ class SciPyTests(MLlibTestCase):
 self.assertEqual(sv, serialize(lil.tocsr()))
 self.assertEqual(sv, serialize(lil.todok()))
 
+def test_convert_to_vector(self):
+from scipy.sparse import 

spark git commit: [SPARK-20214][ML] Make sure converted csc matrix has sorted indices

2017-04-05 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 9d68c6723 -> 12206058e


[SPARK-20214][ML] Make sure converted csc matrix has sorted indices

## What changes were proposed in this pull request?

`_convert_to_vector` converts a scipy sparse matrix to csc matrix for 
initializing `SparseVector`. However, it doesn't guarantee the converted csc 
matrix has sorted indices and so a failure happens when you do something like 
that:

from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
_convert_to_vector(lil.todok())

File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", 
line 78, in _convert_to_vector
  return SparseVector(l.shape[0], csc.indices, csc.data)
File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", 
line 556, in __init__
  % (self.indices[i], self.indices[i + 1]))
TypeError: Indices 3 and 1 are not strictly increasing

A simple test can confirm that `dok_matrix.tocsc()` won't guarantee sorted 
indices:

>>> from scipy.sparse import lil_matrix
>>> lil = lil_matrix((4, 1))
>>> lil[1, 0] = 1
>>> lil[3, 0] = 2
>>> dok = lil.todok()
>>> csc = dok.tocsc()
>>> csc.has_sorted_indices
0
>>> csc.indices
array([3, 1], dtype=int32)

I checked the source codes of scipy. The only way to guarantee it is 
`csc_matrix.tocsr()` and `csr_matrix.tocsc()`.

## How was this patch tested?

Existing tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh 

Closes #17532 from viirya/make-sure-sorted-indices.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12206058
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12206058
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12206058

Branch: refs/heads/master
Commit: 12206058e8780e202c208b92774df3773eff36ae
Parents: 9d68c67
Author: Liang-Chi Hsieh 
Authored: Wed Apr 5 17:46:44 2017 -0700
Committer: Joseph K. Bradley 
Committed: Wed Apr 5 17:46:44 2017 -0700

--
 python/pyspark/ml/linalg/__init__.py|  3 +++
 python/pyspark/mllib/linalg/__init__.py |  3 +++
 python/pyspark/mllib/tests.py   | 11 +++
 3 files changed, 17 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/12206058/python/pyspark/ml/linalg/__init__.py
--
diff --git a/python/pyspark/ml/linalg/__init__.py 
b/python/pyspark/ml/linalg/__init__.py
index b765343..ad1b487 100644
--- a/python/pyspark/ml/linalg/__init__.py
+++ b/python/pyspark/ml/linalg/__init__.py
@@ -72,7 +72,10 @@ def _convert_to_vector(l):
 return DenseVector(l)
 elif _have_scipy and scipy.sparse.issparse(l):
 assert l.shape[1] == 1, "Expected column vector"
+# Make sure the converted csc_matrix has sorted indices.
 csc = l.tocsc()
+if not csc.has_sorted_indices:
+csc.sort_indices()
 return SparseVector(l.shape[0], csc.indices, csc.data)
 else:
 raise TypeError("Cannot convert type %s into Vector" % type(l))

http://git-wip-us.apache.org/repos/asf/spark/blob/12206058/python/pyspark/mllib/linalg/__init__.py
--
diff --git a/python/pyspark/mllib/linalg/__init__.py 
b/python/pyspark/mllib/linalg/__init__.py
index 031f22c..7b24b3c 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -74,7 +74,10 @@ def _convert_to_vector(l):
 return DenseVector(l)
 elif _have_scipy and scipy.sparse.issparse(l):
 assert l.shape[1] == 1, "Expected column vector"
+# Make sure the converted csc_matrix has sorted indices.
 csc = l.tocsc()
+if not csc.has_sorted_indices:
+csc.sort_indices()
 return SparseVector(l.shape[0], csc.indices, csc.data)
 else:
 raise TypeError("Cannot convert type %s into Vector" % type(l))

http://git-wip-us.apache.org/repos/asf/spark/blob/12206058/python/pyspark/mllib/tests.py
--
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index c519883..523b3f1 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -853,6 +853,17 @@ class SciPyTests(MLlibTestCase):
 self.assertEqual(sv, serialize(lil.tocsr()))
 self.assertEqual(sv, serialize(lil.todok()))
 
+def test_convert_to_vector(self):
+from scipy.sparse import csc_matrix
+# Create a CSC matrix with non-sorted indices
+indptr = array([0, 2])
+indices = array([3, 1])
+

spark git commit: [SPARK-20224][SS] Updated docs for streaming dropDuplicates and mapGroupsWithState

2017-04-05 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master e2773996b -> 9543fc0e0


[SPARK-20224][SS] Updated docs for streaming dropDuplicates and 
mapGroupsWithState

## What changes were proposed in this pull request?

- Fixed bug in Java API not passing timeout conf to scala API
- Updated markdown docs
- Updated scala docs
- Added scala and Java example

## How was this patch tested?
Manually ran examples.

Author: Tathagata Das 

Closes #17539 from tdas/SPARK-20224.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9543fc0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9543fc0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9543fc0e

Branch: refs/heads/master
Commit: 9543fc0e08a21680961689ea772441c49fcd52ee
Parents: e277399
Author: Tathagata Das 
Authored: Wed Apr 5 16:03:04 2017 -0700
Committer: Tathagata Das 
Committed: Wed Apr 5 16:03:04 2017 -0700

--
 docs/structured-streaming-programming-guide.md  |  98 ++-
 .../streaming/JavaStructuredSessionization.java | 255 +++
 .../streaming/StructuredSessionization.scala| 151 +++
 .../spark/sql/KeyValueGroupedDataset.scala  |   2 +-
 .../apache/spark/sql/streaming/GroupState.scala |  15 +-
 5 files changed, 509 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9543fc0e/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index b5cf9f1..37a1d61 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1,6 +1,6 @@
 ---
 layout: global
-displayTitle: Structured Streaming Programming Guide [Alpha]
+displayTitle: Structured Streaming Programming Guide [Experimental]
 title: Structured Streaming Programming Guide
 ---
 
@@ -871,6 +871,65 @@ streamingDf.join(staticDf, "type", "right_join")  # right 
outer join with a stat
 
 
 
+### Streaming Deduplication
+You can deduplicate records in data streams using a unique identifier in the 
events. This is exactly same as deduplication on static using a unique 
identifier column. The query will store the necessary amount of data from 
previous records such that it can filter duplicate records. Similar to 
aggregations, you can use deduplication with or without watermarking.
+
+- *With watermark* - If there is a upper bound on how late a duplicate record 
may arrive, then you can define a watermark on a event time column and 
deduplicate using both the guid and the event time columns. The query will use 
the watermark to remove old state data from past records that are not expected 
to get any duplicates any more. This bounds the amount of the state the query 
has to maintain.
+
+- *Without watermark* - Since there are no bounds on when a duplicate record 
may arrive, the query stores the data from all the past records as state.
+
+
+
+
+{% highlight scala %}
+val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...
+
+// Without watermark using guid column
+streamingDf.dropDuplicates("guid")
+
+// With watermark using guid and eventTime columns
+streamingDf
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicates("guid", "eventTime")
+{% endhighlight %}
+
+
+
+
+{% highlight java %}
+Dataset streamingDf = spark.readStream. ...;  // columns: guid, 
eventTime, ...
+
+// Without watermark using guid column
+streamingDf.dropDuplicates("guid");
+
+// With watermark using guid and eventTime columns
+streamingDf
+  .withWatermark("eventTime", "10 seconds")
+  .dropDuplicates("guid", "eventTime");
+{% endhighlight %}
+
+
+
+
+
+{% highlight python %}
+streamingDf = spark.readStream. ...
+
+// Without watermark using guid column
+streamingDf.dropDuplicates("guid")
+
+// With watermark using guid and eventTime columns
+streamingDf \
+  .withWatermark("eventTime", "10 seconds") \
+  .dropDuplicates("guid", "eventTime")
+{% endhighlight %}
+
+
+
+
+### Arbitrary Stateful Operations
+Many uscases require more advanced stateful operations than aggregations. For 
example, in many usecases, you have to track sessions from data streams of 
events. For doing such sessionization, you will have to save arbitrary types of 
data as state, and perform arbitrary operations on the state using the data 
stream events in every trigger. Since Spark 2.2, this can be done using the 
operation `mapGroupsWithState` and the more powerful operation 
`flatMapGroupsWithState`. Both operations allow you to apply user-defined code 
on grouped Datasets to update user-defined state. For more concrete details, 
take a look at the 

spark git commit: [SPARK-19454][PYTHON][SQL] DataFrame.replace improvements

2017-04-05 Thread holden
Repository: spark
Updated Branches:
  refs/heads/master a2d8d767d -> e2773996b


[SPARK-19454][PYTHON][SQL] DataFrame.replace improvements

## What changes were proposed in this pull request?

- Allows skipping `value` argument if `to_replace` is a `dict`:
```python
df = sc.parallelize([("Alice", 1, 3.0)]).toDF()
df.replace({"Alice": "Bob"}).show()

- Adds validation step to ensure homogeneous values / replacements.
- Simplifies internal control flow.
- Improves unit tests coverage.

## How was this patch tested?

Existing unit tests, additional unit tests, manual testing.

Author: zero323 

Closes #16793 from zero323/SPARK-19454.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2773996
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2773996
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2773996

Branch: refs/heads/master
Commit: e2773996b8d1c0214d9ffac634a059b4923caf7b
Parents: a2d8d76
Author: zero323 
Authored: Wed Apr 5 11:47:40 2017 -0700
Committer: Holden Karau 
Committed: Wed Apr 5 11:47:40 2017 -0700

--
 python/pyspark/sql/dataframe.py | 81 +---
 python/pyspark/sql/tests.py | 72 
 2 files changed, 128 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e2773996/python/pyspark/sql/dataframe.py
--
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index a24512f..774caf5 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -25,6 +25,8 @@ if sys.version >= '3':
 else:
 from itertools import imap as map
 
+import warnings
+
 from pyspark import copy_func, since
 from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
 from pyspark.serializers import BatchedSerializer, PickleSerializer, 
UTF8Deserializer
@@ -1281,7 +1283,7 @@ class DataFrame(object):
 return DataFrame(self._jdf.na().fill(value, self._jseq(subset)), 
self.sql_ctx)
 
 @since(1.4)
-def replace(self, to_replace, value, subset=None):
+def replace(self, to_replace, value=None, subset=None):
 """Returns a new :class:`DataFrame` replacing a value with another 
value.
 :func:`DataFrame.replace` and :func:`DataFrameNaFunctions.replace` are
 aliases of each other.
@@ -1326,43 +1328,72 @@ class DataFrame(object):
 |null|  null|null|
 ++--++
 """
-if not isinstance(to_replace, (float, int, long, basestring, list, 
tuple, dict)):
+# Helper functions
+def all_of(types):
+"""Given a type or tuple of types and a sequence of xs
+check if each x is instance of type(s)
+
+>>> all_of(bool)([True, False])
+True
+>>> all_of(basestring)(["a", 1])
+False
+"""
+def all_of_(xs):
+return all(isinstance(x, types) for x in xs)
+return all_of_
+
+all_of_bool = all_of(bool)
+all_of_str = all_of(basestring)
+all_of_numeric = all_of((float, int, long))
+
+# Validate input types
+valid_types = (bool, float, int, long, basestring, list, tuple)
+if not isinstance(to_replace, valid_types + (dict, )):
 raise ValueError(
-"to_replace should be a float, int, long, string, list, tuple, 
or dict")
+"to_replace should be a float, int, long, string, list, tuple, 
or dict. "
+"Got {0}".format(type(to_replace)))
 
-if not isinstance(value, (float, int, long, basestring, list, tuple)):
-raise ValueError("value should be a float, int, long, string, 
list, or tuple")
+if not isinstance(value, valid_types) and not isinstance(to_replace, 
dict):
+raise ValueError("If to_replace is not a dict, value should be "
+ "a float, int, long, string, list, or tuple. "
+ "Got {0}".format(type(value)))
+
+if isinstance(to_replace, (list, tuple)) and isinstance(value, (list, 
tuple)):
+if len(to_replace) != len(value):
+raise ValueError("to_replace and value lists should be of the 
same length. "
+ "Got {0} and {1}".format(len(to_replace), 
len(value)))
 
-rep_dict = dict()
+if not (subset is None or isinstance(subset, (list, tuple, 
basestring))):
+raise ValueError("subset should be a list or tuple of column 
names, "
+ "column name or None. Got 

spark git commit: [SPARK-20223][SQL] Fix typo in tpcds q77.sql

2017-04-05 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 90eb37343 -> 15ea5eaa2


[SPARK-20223][SQL] Fix typo in tpcds q77.sql

## What changes were proposed in this pull request?

Fix typo in tpcds q77.sql

## How was this patch tested?

N/A

Author: wangzhenhua 

Closes #17538 from wzhfy/typoQ77.

(cherry picked from commit a2d8d767d933321426a4eb9df1583e017722d7d6)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15ea5eaa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15ea5eaa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15ea5eaa

Branch: refs/heads/branch-2.0
Commit: 15ea5eaa2e45bee4b8221be96b2b666e6d64498b
Parents: 90eb373
Author: wangzhenhua 
Authored: Wed Apr 5 10:21:43 2017 -0700
Committer: Xiao Li 
Committed: Wed Apr 5 10:22:10 2017 -0700

--
 sql/core/src/test/resources/tpcds/q77.sql | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/15ea5eaa/sql/core/src/test/resources/tpcds/q77.sql
--
diff --git a/sql/core/src/test/resources/tpcds/q77.sql 
b/sql/core/src/test/resources/tpcds/q77.sql
index 7830f96..a69df9f 100755
--- a/sql/core/src/test/resources/tpcds/q77.sql
+++ b/sql/core/src/test/resources/tpcds/q77.sql
@@ -36,7 +36,7 @@ WITH ss AS
 sum(cr_net_loss) AS profit_loss
   FROM catalog_returns, date_dim
   WHERE cr_returned_date_sk = d_date_sk
-AND d_date BETWEEN cast('2000-08-03]' AS DATE) AND
+AND d_date BETWEEN cast('2000-08-03' AS DATE) AND
   (cast('2000-08-03' AS DATE) + INTERVAL 30 days)),
 ws AS
   (SELECT


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20223][SQL] Fix typo in tpcds q77.sql

2017-04-05 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 efc72dcc3 -> 2b85e059b


[SPARK-20223][SQL] Fix typo in tpcds q77.sql

## What changes were proposed in this pull request?

Fix typo in tpcds q77.sql

## How was this patch tested?

N/A

Author: wangzhenhua 

Closes #17538 from wzhfy/typoQ77.

(cherry picked from commit a2d8d767d933321426a4eb9df1583e017722d7d6)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b85e059
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b85e059
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b85e059

Branch: refs/heads/branch-2.1
Commit: 2b85e059b634bfc4b015c76b7b232b732460bf12
Parents: efc72dc
Author: wangzhenhua 
Authored: Wed Apr 5 10:21:43 2017 -0700
Committer: Xiao Li 
Committed: Wed Apr 5 10:21:53 2017 -0700

--
 sql/core/src/test/resources/tpcds/q77.sql | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2b85e059/sql/core/src/test/resources/tpcds/q77.sql
--
diff --git a/sql/core/src/test/resources/tpcds/q77.sql 
b/sql/core/src/test/resources/tpcds/q77.sql
index 7830f96..a69df9f 100755
--- a/sql/core/src/test/resources/tpcds/q77.sql
+++ b/sql/core/src/test/resources/tpcds/q77.sql
@@ -36,7 +36,7 @@ WITH ss AS
 sum(cr_net_loss) AS profit_loss
   FROM catalog_returns, date_dim
   WHERE cr_returned_date_sk = d_date_sk
-AND d_date BETWEEN cast('2000-08-03]' AS DATE) AND
+AND d_date BETWEEN cast('2000-08-03' AS DATE) AND
   (cast('2000-08-03' AS DATE) + INTERVAL 30 days)),
 ws AS
   (SELECT


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20223][SQL] Fix typo in tpcds q77.sql

2017-04-05 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 71c3c4815 -> a2d8d767d


[SPARK-20223][SQL] Fix typo in tpcds q77.sql

## What changes were proposed in this pull request?

Fix typo in tpcds q77.sql

## How was this patch tested?

N/A

Author: wangzhenhua 

Closes #17538 from wzhfy/typoQ77.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2d8d767
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2d8d767
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2d8d767

Branch: refs/heads/master
Commit: a2d8d767d933321426a4eb9df1583e017722d7d6
Parents: 71c3c48
Author: wangzhenhua 
Authored: Wed Apr 5 10:21:43 2017 -0700
Committer: Xiao Li 
Committed: Wed Apr 5 10:21:43 2017 -0700

--
 sql/core/src/test/resources/tpcds/q77.sql | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a2d8d767/sql/core/src/test/resources/tpcds/q77.sql
--
diff --git a/sql/core/src/test/resources/tpcds/q77.sql 
b/sql/core/src/test/resources/tpcds/q77.sql
index 7830f96..a69df9f 100755
--- a/sql/core/src/test/resources/tpcds/q77.sql
+++ b/sql/core/src/test/resources/tpcds/q77.sql
@@ -36,7 +36,7 @@ WITH ss AS
 sum(cr_net_loss) AS profit_loss
   FROM catalog_returns, date_dim
   WHERE cr_returned_date_sk = d_date_sk
-AND d_date BETWEEN cast('2000-08-03]' AS DATE) AND
+AND d_date BETWEEN cast('2000-08-03' AS DATE) AND
   (cast('2000-08-03' AS DATE) + INTERVAL 30 days)),
 ws AS
   (SELECT


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19807][WEB UI] Add reason for cancellation when a stage is killed using web UI

2017-04-05 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 6f09dc70d -> 71c3c4815


[SPARK-19807][WEB UI] Add reason for cancellation when a stage is killed using 
web UI

## What changes were proposed in this pull request?

When a user kills a stage using web UI (in Stages page), 
StagesTab.handleKillRequest requests SparkContext to cancel the stage without 
giving a reason. SparkContext has cancelStage(stageId: Int, reason: String) 
that Spark could use to pass the information for monitoring/debugging purposes.

## How was this patch tested?

manual tests

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: shaolinliu 
Author: lvdongr 

Closes #17258 from shaolinliu/SPARK-19807.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71c3c481
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71c3c481
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71c3c481

Branch: refs/heads/master
Commit: 71c3c48159fe7eb4a46fc2a1b78b72088ccfa824
Parents: 6f09dc7
Author: shaolinliu 
Authored: Wed Apr 5 13:47:44 2017 +0100
Committer: Sean Owen 
Committed: Wed Apr 5 13:47:44 2017 +0100

--
 core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/71c3c481/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index c1f2511..181465b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -42,7 +42,7 @@ private[ui] class StagesTab(parent: SparkUI) extends 
SparkUITab(parent, "stages"
   val stageId = Option(request.getParameter("id")).map(_.toInt)
   stageId.foreach { id =>
 if (progressListener.activeStages.contains(id)) {
-  sc.foreach(_.cancelStage(id))
+  sc.foreach(_.cancelStage(id, "killed via the Web UI"))
   // Do a quick pause here to give Spark time to kill the stage so it 
shows up as
   // killed after the refresh. Note that this will block the serving 
thread so the
   // time should be limited in duration.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode

2017-04-05 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 00c124884 -> efc72dcc3


[SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode

with spark.ui.reverseProxy=true, full path URLs like /log will point to
the master web endpoint which is serving the worker UI as reverse proxy.
To access a REST endpoint in the worker in reverse proxy mode , the
leading /proxy/"target"/ part of the base URI must be retained.

Added logic to log-view.js to handle this, similar to executorspage.js

Patch was tested manually

Author: Oliver Köth 

Closes #17370 from okoethibm/master.

(cherry picked from commit 6f09dc70d9808cae004ceda9ad615aa9be50f43d)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efc72dcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efc72dcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efc72dcc

Branch: refs/heads/branch-2.1
Commit: efc72dcc3f964ea9931fb47a454db253556d0f8a
Parents: 00c1248
Author: Oliver Köth 
Authored: Wed Apr 5 08:09:42 2017 +0100
Committer: Sean Owen 
Committed: Wed Apr 5 08:09:52 2017 +0100

--
 .../org/apache/spark/ui/static/log-view.js   | 19 ---
 1 file changed, 16 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/efc72dcc/core/src/main/resources/org/apache/spark/ui/static/log-view.js
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/log-view.js 
b/core/src/main/resources/org/apache/spark/ui/static/log-view.js
index 1782b4f..b5c43e5 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/log-view.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/log-view.js
@@ -51,13 +51,26 @@ function noNewAlert() {
   window.setTimeout(function () {alert.css("display", "none");}, 4000);
 }
 
+
+function getRESTEndPoint() {
+  // If the worker is served from the master through a proxy (see doc on 
spark.ui.reverseProxy), 
+  // we need to retain the leading ../proxy// part of the URL when 
making REST requests.
+  // Similar logic is contained in executorspage.js function 
createRESTEndPoint.
+  var words = document.baseURI.split('/');
+  var ind = words.indexOf("proxy");
+  if (ind > 0) {
+  return words.slice(0, ind + 2).join('/') + "/log";
+  }
+  return "/log"
+}
+
 function loadMore() {
   var offset = Math.max(startByte - byteLength, 0);
   var moreByteLength = Math.min(byteLength, startByte);
 
   $.ajax({
 type: "GET",
-url: "/log" + baseParams + "=" + offset + "=" + 
moreByteLength,
+url: getRESTEndPoint() + baseParams + "=" + offset + "=" 
+ moreByteLength,
 success: function (data) {
   var oldHeight = $(".log-content")[0].scrollHeight;
   var newlineIndex = data.indexOf('\n');
@@ -83,14 +96,14 @@ function loadMore() {
 function loadNew() {
   $.ajax({
 type: "GET",
-url: "/log" + baseParams + "=0",
+url: getRESTEndPoint() + baseParams + "=0",
 success: function (data) {
   var dataInfo = data.substring(0, data.indexOf('\n')).match(/\d+/g);
   var newDataLen = dataInfo[2] - totalLogLength;
   if (newDataLen != 0) {
 $.ajax({
   type: "GET",
-  url: "/log" + baseParams + "=" + newDataLen,
+  url: getRESTEndPoint() + baseParams + "=" + newDataLen,
   success: function (data) {
 var newlineIndex = data.indexOf('\n');
 var dataInfo = data.substring(0, newlineIndex).match(/\d+/g);


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode

2017-04-05 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master dad499f32 -> 6f09dc70d


[SPARK-20042][WEB UI] Fix log page buttons for reverse proxy mode

with spark.ui.reverseProxy=true, full path URLs like /log will point to
the master web endpoint which is serving the worker UI as reverse proxy.
To access a REST endpoint in the worker in reverse proxy mode , the
leading /proxy/"target"/ part of the base URI must be retained.

Added logic to log-view.js to handle this, similar to executorspage.js

Patch was tested manually

Author: Oliver Köth 

Closes #17370 from okoethibm/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f09dc70
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f09dc70
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f09dc70

Branch: refs/heads/master
Commit: 6f09dc70d9808cae004ceda9ad615aa9be50f43d
Parents: dad499f
Author: Oliver Köth 
Authored: Wed Apr 5 08:09:42 2017 +0100
Committer: Sean Owen 
Committed: Wed Apr 5 08:09:42 2017 +0100

--
 .../org/apache/spark/ui/static/log-view.js   | 19 ---
 1 file changed, 16 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6f09dc70/core/src/main/resources/org/apache/spark/ui/static/log-view.js
--
diff --git a/core/src/main/resources/org/apache/spark/ui/static/log-view.js 
b/core/src/main/resources/org/apache/spark/ui/static/log-view.js
index 1782b4f..b5c43e5 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/log-view.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/log-view.js
@@ -51,13 +51,26 @@ function noNewAlert() {
   window.setTimeout(function () {alert.css("display", "none");}, 4000);
 }
 
+
+function getRESTEndPoint() {
+  // If the worker is served from the master through a proxy (see doc on 
spark.ui.reverseProxy), 
+  // we need to retain the leading ../proxy// part of the URL when 
making REST requests.
+  // Similar logic is contained in executorspage.js function 
createRESTEndPoint.
+  var words = document.baseURI.split('/');
+  var ind = words.indexOf("proxy");
+  if (ind > 0) {
+  return words.slice(0, ind + 2).join('/') + "/log";
+  }
+  return "/log"
+}
+
 function loadMore() {
   var offset = Math.max(startByte - byteLength, 0);
   var moreByteLength = Math.min(byteLength, startByte);
 
   $.ajax({
 type: "GET",
-url: "/log" + baseParams + "=" + offset + "=" + 
moreByteLength,
+url: getRESTEndPoint() + baseParams + "=" + offset + "=" 
+ moreByteLength,
 success: function (data) {
   var oldHeight = $(".log-content")[0].scrollHeight;
   var newlineIndex = data.indexOf('\n');
@@ -83,14 +96,14 @@ function loadMore() {
 function loadNew() {
   $.ajax({
 type: "GET",
-url: "/log" + baseParams + "=0",
+url: getRESTEndPoint() + baseParams + "=0",
 success: function (data) {
   var dataInfo = data.substring(0, data.indexOf('\n')).match(/\d+/g);
   var newDataLen = dataInfo[2] - totalLogLength;
   if (newDataLen != 0) {
 $.ajax({
   type: "GET",
-  url: "/log" + baseParams + "=" + newDataLen,
+  url: getRESTEndPoint() + baseParams + "=" + newDataLen,
   success: function (data) {
 var newlineIndex = data.indexOf('\n');
 var dataInfo = data.substring(0, newlineIndex).match(/\d+/g);


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval

2017-04-05 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master b6e71032d -> dad499f32


[SPARK-20209][SS] Execute next trigger immediately if previous batch took 
longer than trigger interval

## What changes were proposed in this pull request?

For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, 
then it will wait for 9 mins before starting the next batch. This does not make 
sense. The processing time based trigger policy should be to do process batches 
as fast as possible, but no faster than 1 in every trigger interval. If batches 
are taking longer than trigger interval anyways, then no point waiting extra 
trigger interval.

In this PR, I modified the ProcessingTimeExecutor to do so. Another minor 
change I did was to extract our StreamManualClock into a separate class so that 
it can be used outside subclasses of StreamTest. For example, 
ProcessingTimeExecutorSuite does not need to create any context for testing, 
just needs the StreamManualClock.

## How was this patch tested?
Added new unit tests to comprehensively test this behavior.

Author: Tathagata Das 

Closes #17525 from tdas/SPARK-20209.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dad499f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dad499f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dad499f3

Branch: refs/heads/master
Commit: dad499f324c6a93650aecfeb8cde10a405372930
Parents: b6e7103
Author: Tathagata Das 
Authored: Tue Apr 4 23:20:17 2017 -0700
Committer: Tathagata Das 
Committed: Tue Apr 4 23:20:17 2017 -0700

--
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |  1 +
 .../execution/streaming/TriggerExecutor.scala   | 17 ++--
 .../streaming/ProcessingTimeExecutorSuite.scala | 83 +--
 .../sql/streaming/FileStreamSourceSuite.scala   |  1 +
 .../streaming/FlatMapGroupsWithStateSuite.scala |  3 +-
 .../spark/sql/streaming/StreamSuite.scala   |  1 +
 .../apache/spark/sql/streaming/StreamTest.scala | 20 +
 .../streaming/StreamingAggregationSuite.scala   |  1 +
 .../streaming/StreamingQueryListenerSuite.scala |  1 +
 .../sql/streaming/StreamingQuerySuite.scala | 87 
 .../sql/streaming/util/StreamManualClock.scala  | 51 
 11 files changed, 194 insertions(+), 72 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
--
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index 6391d62..0046ba7 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dad499f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index 02996ac..d188566 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -47,21 +47,22 @@ case class ProcessingTimeExecutor(processingTime: 
ProcessingTime, clock: Clock =
   extends TriggerExecutor with Logging {
 
   private val intervalMs = processingTime.intervalMs
+  require(intervalMs >= 0)
 
-  override def execute(batchRunner: () => Boolean): Unit = {
+  override def execute(triggerHandler: () => Boolean): Unit = {
 while (true) {
-  val batchStartTimeMs = clock.getTimeMillis()
-  val terminated = !batchRunner()
+  val triggerTimeMs = clock.getTimeMillis
+  val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
+  val terminated = !triggerHandler()
   if (intervalMs > 0) {
-val batchEndTimeMs = clock.getTimeMillis()
-val