[3/3] spark git commit: [SPARK-23445] ColumnStat refactoring

2018-02-26 Thread lixiao
[SPARK-23445] ColumnStat refactoring

## What changes were proposed in this pull request?

Refactor ColumnStat to be more flexible.

* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is 
split from `Statistics`. This detaches how the statistics are stored from how 
they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` 
as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the 
metastore (`KEY_VERSION` property), not from metastore schema. This means that 
`CatalogColumnStat`s can be created for columns even if the schema itself is 
not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were 
optional already. Having them all optional is more consistent, and gives 
flexibility to e.g. drop some of the fields through transformations if they are 
difficult / impossible to calculate.

The added flexibility will make it possible to have alternative implementations 
for stats, and separates stats collection from stats and estimation processing 
in plans.

## How was this patch tested?

Refactored existing tests to work with refactored `ColumnStat` and 
`CatalogColumnStat`.
New tests added in `StatisticsSuite` checking that backwards / forwards 
compatibility is not broken.

Author: Juliusz Sompolski 

Closes #20624 from juliuszsompolski/SPARK-23445.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8077bb04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8077bb04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8077bb04

Branch: refs/heads/master
Commit: 8077bb04f350fd35df83ef896135c0672dc3f7b0
Parents: 7ec8365
Author: Juliusz Sompolski 
Authored: Mon Feb 26 23:37:31 2018 -0800
Committer: gatorsmile 
Committed: Mon Feb 26 23:37:31 2018 -0800

--
 .../spark/sql/catalyst/catalog/interface.scala  | 146 +-
 .../optimizer/StarSchemaDetection.scala |   6 +-
 .../sql/catalyst/plans/logical/Statistics.scala | 256 ++---
 .../statsEstimation/AggregateEstimation.scala   |   6 +-
 .../statsEstimation/EstimationUtils.scala   |  20 +-
 .../statsEstimation/FilterEstimation.scala  |  98 ---
 .../statsEstimation/JoinEstimation.scala|  55 ++--
 .../catalyst/optimizer/JoinReorderSuite.scala   |  25 +-
 .../StarJoinCostBasedReorderSuite.scala |  96 +++
 .../optimizer/StarJoinReorderSuite.scala|  77 ++---
 .../AggregateEstimationSuite.scala  |  24 +-
 .../BasicStatsEstimationSuite.scala |  12 +-
 .../statsEstimation/FilterEstimationSuite.scala | 279 ++-
 .../statsEstimation/JoinEstimationSuite.scala   | 138 +
 .../ProjectEstimationSuite.scala|  70 +++--
 .../StatsEstimationTestBase.scala   |  10 +-
 .../command/AnalyzeColumnCommand.scala  | 138 -
 .../spark/sql/execution/command/tables.scala|   9 +-
 .../spark/sql/StatisticsCollectionSuite.scala   |   9 +-
 .../sql/StatisticsCollectionTestBase.scala  | 168 +--
 .../spark/sql/hive/HiveExternalCatalog.scala|  63 ++---
 .../apache/spark/sql/hive/StatisticsSuite.scala | 162 +++
 22 files changed, 995 insertions(+), 872 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 95b6fbb..f3e67dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,7 +21,9 @@ import java.net.URI
 import java.util.Date
 
 import scala.collection.mutable
+import scala.util.control.NonFatal
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
@@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 
 /**
@@ -361,7 +363,7 @@ object CatalogTable {
 case class CatalogStatistics(
 sizeInBytes: BigInt,
 rowCount: Option[BigInt] = None,
-

[1/3] spark git commit: [SPARK-23445] ColumnStat refactoring

2018-02-26 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 7ec83658f -> 8077bb04f


http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1ee1d57..28c340a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -663,14 +663,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 requireTableExists(db, table)
 val rawTable = getRawTable(db, table)
 
-// For datasource tables and hive serde tables created by spark 2.1 or 
higher,
-// the data schema is stored in the table properties.
-val schema = restoreTableMetadata(rawTable).schema
-
 // convert table statistics to properties so that we can persist them 
through hive client
 val statsProperties =
   if (stats.isDefined) {
-statsToProperties(stats.get, schema)
+statsToProperties(stats.get)
   } else {
 new mutable.HashMap[String, String]()
   }
@@ -1028,9 +1024,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 currentFullPath
   }
 
-  private def statsToProperties(
-  stats: CatalogStatistics,
-  schema: StructType): Map[String, String] = {
+  private def statsToProperties(stats: CatalogStatistics): Map[String, String] 
= {
 
 val statsProperties = new mutable.HashMap[String, String]()
 statsProperties += STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()
@@ -1038,11 +1032,12 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
   statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
 }
 
-val colNameTypeMap: Map[String, DataType] =
-  schema.fields.map(f => (f.name, f.dataType)).toMap
 stats.colStats.foreach { case (colName, colStat) =>
-  colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
-statsProperties += (columnStatKeyPropName(colName, k) -> v)
+  colStat.toMap(colName).foreach { case (k, v) =>
+// Fully qualified name used in table properties for a particular 
column stat.
+// For example, for column "mycol", and "min" stat, this should return
+// "spark.sql.statistics.colStats.mycol.min".
+statsProperties += (STATISTICS_COL_STATS_PREFIX + k -> v)
   }
 }
 
@@ -1058,23 +1053,20 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 if (statsProps.isEmpty) {
   None
 } else {
+  val colStats = new mutable.HashMap[String, CatalogColumnStat]
+  val colStatsProps = 
properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
+case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
+  }
 
-  val colStats = new mutable.HashMap[String, ColumnStat]
-
-  // For each column, recover its column stats. Note that this is 
currently a O(n^2) operation,
-  // but given the number of columns it usually not enormous, this is 
probably OK as a start.
-  // If we want to map this a linear operation, we'd need a stronger 
contract between the
-  // naming convention used for serialization.
-  schema.foreach { field =>
-if (statsProps.contains(columnStatKeyPropName(field.name, 
ColumnStat.KEY_VERSION))) {
-  // If "version" field is defined, then the column stat is defined.
-  val keyPrefix = columnStatKeyPropName(field.name, "")
-  val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map 
{ case (k, v) =>
-(k.drop(keyPrefix.length), v)
-  }
-  ColumnStat.fromMap(table, field, colStatMap).foreach { cs =>
-colStats += field.name -> cs
-  }
+  // Find all the column names by matching the KEY_VERSION properties for 
them.
+  colStatsProps.keys.filter {
+k => k.endsWith(CatalogColumnStat.KEY_VERSION)
+  }.map { k =>
+k.dropRight(CatalogColumnStat.KEY_VERSION.length + 1)
+  }.foreach { fieldName =>
+// and for each, create a column stat.
+CatalogColumnStat.fromMap(table, fieldName, colStatsProps).foreach { 
cs =>
+  colStats += fieldName -> cs
 }
   }
 
@@ -1093,14 +1085,10 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 
 val rawTable = getRawTable(db, table)
 
-// For datasource tables and hive serde tables created by spark 2.1 or 
higher,
-// the data schema is stored in the table properties.
-val schema = restoreTableMetadata(rawTable).schema
-
 // convert partition statistics to properties so that we can persist them 

[2/3] spark git commit: [SPARK-23445] ColumnStat refactoring

2018-02-26 Thread lixiao
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
index 2b1fe98..43440d5 100755
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
@@ -37,59 +37,61 @@ class FilterEstimationSuite extends StatsEstimationTestBase 
{
   // column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
   // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
   val attrInt = AttributeReference("cint", IntegerType)()
-  val colStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = 
Some(10),
-nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatInt = ColumnStat(distinctCount = Some(10), min = Some(1), max = 
Some(10),
+nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cbool has only 2 distinct values
   val attrBool = AttributeReference("cbool", BooleanType)()
-  val colStatBool = ColumnStat(distinctCount = 2, min = Some(false), max = 
Some(true),
-nullCount = 0, avgLen = 1, maxLen = 1)
+  val colStatBool = ColumnStat(distinctCount = Some(2), min = Some(false), max 
= Some(true),
+nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))
 
   // column cdate has 10 values from 2017-01-01 through 2017-01-10.
   val dMin = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-01"))
   val dMax = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-10"))
   val attrDate = AttributeReference("cdate", DateType)()
-  val colStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = 
Some(dMax),
-nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatDate = ColumnStat(distinctCount = Some(10),
+min = Some(dMin), max = Some(dMax),
+nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cdecimal has 4 values from 0.20 through 0.80 at increment of 0.20.
   val decMin = Decimal("0.20")
   val decMax = Decimal("0.80")
   val attrDecimal = AttributeReference("cdecimal", DecimalType(18, 18))()
-  val colStatDecimal = ColumnStat(distinctCount = 4, min = Some(decMin), max = 
Some(decMax),
-nullCount = 0, avgLen = 8, maxLen = 8)
+  val colStatDecimal = ColumnStat(distinctCount = Some(4),
+min = Some(decMin), max = Some(decMax),
+nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
 
   // column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 
8.0, 9.0, 10.0
   val attrDouble = AttributeReference("cdouble", DoubleType)()
-  val colStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), max = 
Some(10.0),
-nullCount = 0, avgLen = 8, maxLen = 8)
+  val colStatDouble = ColumnStat(distinctCount = Some(10), min = Some(1.0), 
max = Some(10.0),
+nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
 
   // column cstring has 10 String values:
   // "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9"
   val attrString = AttributeReference("cstring", StringType)()
-  val colStatString = ColumnStat(distinctCount = 10, min = None, max = None,
-nullCount = 0, avgLen = 2, maxLen = 2)
+  val colStatString = ColumnStat(distinctCount = Some(10), min = None, max = 
None,
+nullCount = Some(0), avgLen = Some(2), maxLen = Some(2))
 
   // column cint2 has values: 7, 8, 9, 10, 11, 12, 13, 14, 15, 16
   // Hence, distinctCount:10, min:7, max:16, nullCount:0, avgLen:4, maxLen:4
   // This column is created to test "cint < cint2
   val attrInt2 = AttributeReference("cint2", IntegerType)()
-  val colStatInt2 = ColumnStat(distinctCount = 10, min = Some(7), max = 
Some(16),
-nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatInt2 = ColumnStat(distinctCount = Some(10), min = Some(7), max = 
Some(16),
+nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cint3 has values: 30, 31, 32, 33, 34, 35, 36, 37, 38, 39
   // Hence, distinctCount:10, min:30, max:39, nullCount:0, avgLen:4, maxLen:4
   // This column is created to test "cint = cint3 without overlap at all.
   val attrInt3 = AttributeReference("cint3", IntegerType)()
-  val colStatInt3 = ColumnStat(distinctCount = 10, min = Some(30), max = 
Some(39),
-nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatInt3 = ColumnStat(distinctCount = Some(10), min = Some(30), max = 
Some(39),
+nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cint4 has values in the range from 1 to 10
   // distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
   // This column is created to test complete overlap
   val attrInt4 = AttributeReferen

svn commit: r25288 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_26_14_01-6eee545-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-02-26 Thread pwendell
Author: pwendell
Date: Mon Feb 26 22:15:38 2018
New Revision: 25288

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_02_26_14_01-6eee545 docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r25286 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_26_12_01-7ec8365-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-02-26 Thread pwendell
Author: pwendell
Date: Mon Feb 26 20:15:58 2018
New Revision: 25286

Log:
Apache Spark 2.4.0-SNAPSHOT-2018_02_26_12_01-7ec8365 docs


[This commit notification would consist of 1444 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring

2018-02-26 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 185f5bc7d -> 7ec83658f


[SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution 
reconfiguring

## What changes were proposed in this pull request?

Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed 
in the JIRA, interrupting the thread is only relevant in the microbatch case; 
for continuous processing the query execution can quickly clean itself up 
without.

## How was this patch tested?

existing tests

Author: Jose Torres 

Closes #20622 from jose-torres/SPARK-23441.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ec83658
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ec83658
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ec83658

Branch: refs/heads/master
Commit: 7ec83658fbc88505dfc2d8a6f76e90db747f1292
Parents: 185f5bc
Author: Jose Torres 
Authored: Mon Feb 26 11:28:44 2018 -0800
Committer: Tathagata Das 
Committed: Mon Feb 26 11:28:44 2018 -0800

--
 .../streaming/continuous/ContinuousExecution.scala| 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7ec83658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 2c1d6c5..daebd1d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -236,9 +236,7 @@ class ContinuousExecution(
 startTrigger()
 
 if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, 
RECONFIGURING)) {
-  stopSources()
   if (queryExecutionThread.isAlive) {
-sparkSession.sparkContext.cancelJobGroup(runId.toString)
 queryExecutionThread.interrupt()
   }
   false
@@ -266,12 +264,20 @@ class ContinuousExecution(
 SQLExecution.withNewExecutionId(
   sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
   }
+} catch {
+  case t: Throwable
+  if StreamExecution.isInterruptionException(t) && state.get() == 
RECONFIGURING =>
+logInfo(s"Query $id ignoring exception from reconfiguring: $t")
+// interrupted by reconfiguration - swallow exception so we can 
restart the query
 } finally {
   epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
   SparkEnv.get.rpcEnv.stop(epochEndpoint)
 
   epochUpdateThread.interrupt()
   epochUpdateThread.join()
+
+  stopSources()
+  sparkSession.sparkContext.cancelJobGroup(runId.toString)
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23449][K8S] Preserve extraJavaOptions ordering

2018-02-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 1f180cd12 -> 6eee545f9


[SPARK-23449][K8S] Preserve extraJavaOptions ordering

For some JVM options, like `-XX:+UnlockExperimentalVMOptions` ordering is 
necessary.

## What changes were proposed in this pull request?

Keep original `extraJavaOptions` ordering, when passing them through 
environment variables inside the Docker container.

## How was this patch tested?

Ran base branch a couple of times and checked startup command in logs. Ordering 
differed every time. Added sorting, ordering was consistent to what user had in 
`extraJavaOptions`.

Author: Andrew Korzhuev 

Closes #20628 from andrusha/patch-2.

(cherry picked from commit 185f5bc7dd52cebe8fac9393ecb2bd0968bc5867)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6eee545f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6eee545f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6eee545f

Branch: refs/heads/branch-2.3
Commit: 6eee545f9ad4552057bb51daa866d68b08f27c0f
Parents: 1f180cd
Author: Andrew Korzhuev 
Authored: Mon Feb 26 10:28:45 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 26 10:29:01 2018 -0800

--
 .../kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6eee545f/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
--
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index b9090dc..3d67b0a 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -41,7 +41,7 @@ fi
 shift 1
 
 SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
-env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt
+env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt
 readarray -t SPARK_JAVA_OPTS < /tmp/java_opts.txt
 if [ -n "$SPARK_MOUNTED_CLASSPATH" ]; then
   SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_MOUNTED_CLASSPATH"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23449][K8S] Preserve extraJavaOptions ordering

2018-02-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master b308182f2 -> 185f5bc7d


[SPARK-23449][K8S] Preserve extraJavaOptions ordering

For some JVM options, like `-XX:+UnlockExperimentalVMOptions` ordering is 
necessary.

## What changes were proposed in this pull request?

Keep original `extraJavaOptions` ordering, when passing them through 
environment variables inside the Docker container.

## How was this patch tested?

Ran base branch a couple of times and checked startup command in logs. Ordering 
differed every time. Added sorting, ordering was consistent to what user had in 
`extraJavaOptions`.

Author: Andrew Korzhuev 

Closes #20628 from andrusha/patch-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/185f5bc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/185f5bc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/185f5bc7

Branch: refs/heads/master
Commit: 185f5bc7dd52cebe8fac9393ecb2bd0968bc5867
Parents: b308182
Author: Andrew Korzhuev 
Authored: Mon Feb 26 10:28:45 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 26 10:28:45 2018 -0800

--
 .../kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/185f5bc7/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
--
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index b9090dc..3d67b0a 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -41,7 +41,7 @@ fi
 shift 1
 
 SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*"
-env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt
+env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt
 readarray -t SPARK_JAVA_OPTS < /tmp/java_opts.txt
 if [ -n "$SPARK_MOUNTED_CLASSPATH" ]; then
   SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_MOUNTED_CLASSPATH"


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r25283 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_26_10_01-1f180cd-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-02-26 Thread pwendell
Author: pwendell
Date: Mon Feb 26 18:16:06 2018
New Revision: 25283

Log:
Apache Spark 2.3.1-SNAPSHOT-2018_02_26_10_01-1f180cd docs


[This commit notification would consist of 1443 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

2018-02-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 076c2f6a1 -> d51c6aaeb


[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes 
that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch 
and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will 
prevent data loss.

Additional unit test + manually.

Author: Gabor Somogyi 

Closes #20620 from gaborgsomogyi/SPARK-23438.

(cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d51c6aae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d51c6aae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d51c6aae

Branch: refs/heads/branch-2.0
Commit: d51c6aaeb7ff7e1ea74f0c282b76d20300c1dcae
Parents: 076c2f6
Author: Gabor Somogyi 
Authored: Mon Feb 26 08:39:44 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 26 08:57:45 2018 -0800

--
 .../scheduler/ReceivedBlockTracker.scala| 11 ++
 .../streaming/ReceivedBlockTrackerSuite.scala   | 23 +++-
 2 files changed, 29 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d51c6aae/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5d9a8ac..dacff69 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
   getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 }
 
-// Insert the recovered block-to-batch allocations and clear the queue of 
received blocks
-// (when the blocks were originally allocated to the batch, the queue must 
have been cleared).
+// Insert the recovered block-to-batch allocations and removes them from 
queue of
+// received blocks.
 def insertAllocatedBatch(batchTime: Time, allocatedBlocks: 
AllocatedBlocks) {
   logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
 s"${allocatedBlocks.streamIdToAllocatedBlocks}")
-  streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+  allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+case (streamId, allocatedBlocksInStream) =>
+  
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
+  }
   timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
   lastAllocatedBatchTime = batchTime
 }
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Write an update to the tracker to the write ahead log */
-  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
+  private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): 
Boolean = {
 if (isWriteAheadLogEnabled) {
   logTrace(s"Writing record: $record")
   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/d51c6aae/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 851013b..d16e2bf 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
 import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("recovery with write ahead logs should rem

spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

2018-02-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 24fe6eb0f -> 2d751fbf6


[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes 
that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch 
and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will 
prevent data loss.

Additional unit test + manually.

Author: Gabor Somogyi 

Closes #20620 from gaborgsomogyi/SPARK-23438.

(cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d751fbf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d751fbf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d751fbf

Branch: refs/heads/branch-2.1
Commit: 2d751fbf6e50ad5d470f4d00b7f254bd1d398270
Parents: 24fe6eb
Author: Gabor Somogyi 
Authored: Mon Feb 26 08:39:44 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 26 08:57:21 2018 -0800

--
 .../scheduler/ReceivedBlockTracker.scala| 11 ++
 .../streaming/ReceivedBlockTrackerSuite.scala   | 23 +++-
 2 files changed, 29 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2d751fbf/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5d9a8ac..dacff69 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
   getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 }
 
-// Insert the recovered block-to-batch allocations and clear the queue of 
received blocks
-// (when the blocks were originally allocated to the batch, the queue must 
have been cleared).
+// Insert the recovered block-to-batch allocations and removes them from 
queue of
+// received blocks.
 def insertAllocatedBatch(batchTime: Time, allocatedBlocks: 
AllocatedBlocks) {
   logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
 s"${allocatedBlocks.streamIdToAllocatedBlocks}")
-  streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+  allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+case (streamId, allocatedBlocksInStream) =>
+  
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
+  }
   timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
   lastAllocatedBatchTime = batchTime
 }
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Write an update to the tracker to the write ahead log */
-  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
+  private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): 
Boolean = {
 if (isWriteAheadLogEnabled) {
   logTrace(s"Writing record: $record")
   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/2d751fbf/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 851013b..d16e2bf 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
 import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("recovery with write ahead logs should rem

spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

2018-02-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 1cc34f3e5 -> fa3667ece


[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes 
that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch 
and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will 
prevent data loss.

Additional unit test + manually.

Author: Gabor Somogyi 

Closes #20620 from gaborgsomogyi/SPARK-23438.

(cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa3667ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa3667ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa3667ec

Branch: refs/heads/branch-2.2
Commit: fa3667ecef0bcdf8aab3242904238e3ba28d2705
Parents: 1cc34f3
Author: Gabor Somogyi 
Authored: Mon Feb 26 08:39:44 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 26 08:56:57 2018 -0800

--
 .../scheduler/ReceivedBlockTracker.scala| 11 ++
 .../streaming/ReceivedBlockTrackerSuite.scala   | 23 +++-
 2 files changed, 29 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa3667ec/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5d9a8ac..dacff69 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
   getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 }
 
-// Insert the recovered block-to-batch allocations and clear the queue of 
received blocks
-// (when the blocks were originally allocated to the batch, the queue must 
have been cleared).
+// Insert the recovered block-to-batch allocations and removes them from 
queue of
+// received blocks.
 def insertAllocatedBatch(batchTime: Time, allocatedBlocks: 
AllocatedBlocks) {
   logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
 s"${allocatedBlocks.streamIdToAllocatedBlocks}")
-  streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+  allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+case (streamId, allocatedBlocksInStream) =>
+  
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
+  }
   timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
   lastAllocatedBatchTime = batchTime
 }
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Write an update to the tracker to the write ahead log */
-  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
+  private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): 
Boolean = {
 if (isWriteAheadLogEnabled) {
   logTrace(s"Writing record: $record")
   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/fa3667ec/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 107c3f5..4fa236b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
 import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("recovery with write ahead logs should rem

spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

2018-02-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 578607b30 -> 1f180cd12


[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes 
that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch 
and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will 
prevent data loss.

Additional unit test + manually.

Author: Gabor Somogyi 

Closes #20620 from gaborgsomogyi/SPARK-23438.

(cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f180cd1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f180cd1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f180cd1

Branch: refs/heads/branch-2.3
Commit: 1f180cd121b13ecd455bee55ed2224936a2f3b2a
Parents: 578607b
Author: Gabor Somogyi 
Authored: Mon Feb 26 08:39:44 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 26 08:55:57 2018 -0800

--
 .../scheduler/ReceivedBlockTracker.scala| 11 ++
 .../streaming/ReceivedBlockTrackerSuite.scala   | 23 +++-
 2 files changed, 29 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1f180cd1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5d9a8ac..dacff69 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
   getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 }
 
-// Insert the recovered block-to-batch allocations and clear the queue of 
received blocks
-// (when the blocks were originally allocated to the batch, the queue must 
have been cleared).
+// Insert the recovered block-to-batch allocations and removes them from 
queue of
+// received blocks.
 def insertAllocatedBatch(batchTime: Time, allocatedBlocks: 
AllocatedBlocks) {
   logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
 s"${allocatedBlocks.streamIdToAllocatedBlocks}")
-  streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+  allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+case (streamId, allocatedBlocksInStream) =>
+  
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
+  }
   timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
   lastAllocatedBatchTime = batchTime
 }
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Write an update to the tracker to the write ahead log */
-  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
+  private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): 
Boolean = {
 if (isWriteAheadLogEnabled) {
   logTrace(s"Writing record: $record")
   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/1f180cd1/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 107c3f5..4fa236b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
 import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("recovery with write ahead logs should rem

spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

2018-02-26 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 3ca9a2c56 -> b308182f2


[SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes

## What changes were proposed in this pull request?

There is a race condition introduced in SPARK-11141 which could cause data loss.
The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes 
that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch 
and clears the queue.

In this PR only the allocated blocks will be removed from the queue which will 
prevent data loss.

## How was this patch tested?

Additional unit test + manually.

Author: Gabor Somogyi 

Closes #20620 from gaborgsomogyi/SPARK-23438.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b308182f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b308182f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b308182f

Branch: refs/heads/master
Commit: b308182f233b8840dfe0e6b5736d2f2746f40757
Parents: 3ca9a2c
Author: Gabor Somogyi 
Authored: Mon Feb 26 08:39:44 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 26 08:39:44 2018 -0800

--
 .../scheduler/ReceivedBlockTracker.scala| 11 ++
 .../streaming/ReceivedBlockTrackerSuite.scala   | 23 +++-
 2 files changed, 29 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b308182f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 5d9a8ac..dacff69 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker(
   getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
 }
 
-// Insert the recovered block-to-batch allocations and clear the queue of 
received blocks
-// (when the blocks were originally allocated to the batch, the queue must 
have been cleared).
+// Insert the recovered block-to-batch allocations and removes them from 
queue of
+// received blocks.
 def insertAllocatedBatch(batchTime: Time, allocatedBlocks: 
AllocatedBlocks) {
   logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " +
 s"${allocatedBlocks.streamIdToAllocatedBlocks}")
-  streamIdToUnallocatedBlockQueues.values.foreach { _.clear() }
+  allocatedBlocks.streamIdToAllocatedBlocks.foreach {
+case (streamId, allocatedBlocksInStream) =>
+  
getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet)
+  }
   timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
   lastAllocatedBatchTime = batchTime
 }
@@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker(
   }
 
   /** Write an update to the tracker to the write ahead log */
-  private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = {
+  private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): 
Boolean = {
 if (isWriteAheadLogEnabled) {
   logTrace(s"Writing record: $record")
   try {

http://git-wip-us.apache.org/repos/asf/spark/blob/b308182f/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
--
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 107c3f5..4fa236b 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.storage.StreamBlockId
 import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _}
 import org.apache.spark.streaming.util._
 import org.apache.spark.streaming.util.WriteAheadLogSuite._
 import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils}
@@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite
 receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
   }
 
+  test("recovery with write ahead logs should remove only allocated blocks