[3/3] spark git commit: [SPARK-23445] ColumnStat refactoring
[SPARK-23445] ColumnStat refactoring ## What changes were proposed in this pull request? Refactor ColumnStat to be more flexible. * Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information. * For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore. * Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate. The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans. ## How was this patch tested? Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`. New tests added in `StatisticsSuite` checking that backwards / forwards compatibility is not broken. Author: Juliusz Sompolski Closes #20624 from juliuszsompolski/SPARK-23445. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8077bb04 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8077bb04 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8077bb04 Branch: refs/heads/master Commit: 8077bb04f350fd35df83ef896135c0672dc3f7b0 Parents: 7ec8365 Author: Juliusz Sompolski Authored: Mon Feb 26 23:37:31 2018 -0800 Committer: gatorsmile Committed: Mon Feb 26 23:37:31 2018 -0800 -- .../spark/sql/catalyst/catalog/interface.scala | 146 +- .../optimizer/StarSchemaDetection.scala | 6 +- .../sql/catalyst/plans/logical/Statistics.scala | 256 ++--- .../statsEstimation/AggregateEstimation.scala | 6 +- .../statsEstimation/EstimationUtils.scala | 20 +- .../statsEstimation/FilterEstimation.scala | 98 --- .../statsEstimation/JoinEstimation.scala| 55 ++-- .../catalyst/optimizer/JoinReorderSuite.scala | 25 +- .../StarJoinCostBasedReorderSuite.scala | 96 +++ .../optimizer/StarJoinReorderSuite.scala| 77 ++--- .../AggregateEstimationSuite.scala | 24 +- .../BasicStatsEstimationSuite.scala | 12 +- .../statsEstimation/FilterEstimationSuite.scala | 279 ++- .../statsEstimation/JoinEstimationSuite.scala | 138 + .../ProjectEstimationSuite.scala| 70 +++-- .../StatsEstimationTestBase.scala | 10 +- .../command/AnalyzeColumnCommand.scala | 138 - .../spark/sql/execution/command/tables.scala| 9 +- .../spark/sql/StatisticsCollectionSuite.scala | 9 +- .../sql/StatisticsCollectionTestBase.scala | 168 +-- .../spark/sql/hive/HiveExternalCatalog.scala| 63 ++--- .../apache/spark/sql/hive/StatisticsSuite.scala | 162 +++ 22 files changed, 995 insertions(+), 872 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 95b6fbb..f3e67dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -21,7 +21,9 @@ import java.net.URI import java.util.Date import scala.collection.mutable +import scala.util.control.NonFatal +import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation @@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types._ /** @@ -361,7 +363,7 @@ object CatalogTable { case class CatalogStatistics( sizeInBytes: BigInt, rowCount: Option[BigInt] = None, -
[1/3] spark git commit: [SPARK-23445] ColumnStat refactoring
Repository: spark Updated Branches: refs/heads/master 7ec83658f -> 8077bb04f http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 1ee1d57..28c340a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -663,14 +663,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat requireTableExists(db, table) val rawTable = getRawTable(db, table) -// For datasource tables and hive serde tables created by spark 2.1 or higher, -// the data schema is stored in the table properties. -val schema = restoreTableMetadata(rawTable).schema - // convert table statistics to properties so that we can persist them through hive client val statsProperties = if (stats.isDefined) { -statsToProperties(stats.get, schema) +statsToProperties(stats.get) } else { new mutable.HashMap[String, String]() } @@ -1028,9 +1024,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat currentFullPath } - private def statsToProperties( - stats: CatalogStatistics, - schema: StructType): Map[String, String] = { + private def statsToProperties(stats: CatalogStatistics): Map[String, String] = { val statsProperties = new mutable.HashMap[String, String]() statsProperties += STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString() @@ -1038,11 +1032,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString() } -val colNameTypeMap: Map[String, DataType] = - schema.fields.map(f => (f.name, f.dataType)).toMap stats.colStats.foreach { case (colName, colStat) => - colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) => -statsProperties += (columnStatKeyPropName(colName, k) -> v) + colStat.toMap(colName).foreach { case (k, v) => +// Fully qualified name used in table properties for a particular column stat. +// For example, for column "mycol", and "min" stat, this should return +// "spark.sql.statistics.colStats.mycol.min". +statsProperties += (STATISTICS_COL_STATS_PREFIX + k -> v) } } @@ -1058,23 +1053,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat if (statsProps.isEmpty) { None } else { + val colStats = new mutable.HashMap[String, CatalogColumnStat] + val colStatsProps = properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map { +case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v + } - val colStats = new mutable.HashMap[String, ColumnStat] - - // For each column, recover its column stats. Note that this is currently a O(n^2) operation, - // but given the number of columns it usually not enormous, this is probably OK as a start. - // If we want to map this a linear operation, we'd need a stronger contract between the - // naming convention used for serialization. - schema.foreach { field => -if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) { - // If "version" field is defined, then the column stat is defined. - val keyPrefix = columnStatKeyPropName(field.name, "") - val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) => -(k.drop(keyPrefix.length), v) - } - ColumnStat.fromMap(table, field, colStatMap).foreach { cs => -colStats += field.name -> cs - } + // Find all the column names by matching the KEY_VERSION properties for them. + colStatsProps.keys.filter { +k => k.endsWith(CatalogColumnStat.KEY_VERSION) + }.map { k => +k.dropRight(CatalogColumnStat.KEY_VERSION.length + 1) + }.foreach { fieldName => +// and for each, create a column stat. +CatalogColumnStat.fromMap(table, fieldName, colStatsProps).foreach { cs => + colStats += fieldName -> cs } } @@ -1093,14 +1085,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val rawTable = getRawTable(db, table) -// For datasource tables and hive serde tables created by spark 2.1 or higher, -// the data schema is stored in the table properties. -val schema = restoreTableMetadata(rawTable).schema - // convert partition statistics to properties so that we can persist them
[2/3] spark git commit: [SPARK-23445] ColumnStat refactoring
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala index 2b1fe98..43440d5 100755 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala @@ -37,59 +37,61 @@ class FilterEstimationSuite extends StatsEstimationTestBase { // column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 val attrInt = AttributeReference("cint", IntegerType)() - val colStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10), -nullCount = 0, avgLen = 4, maxLen = 4) + val colStatInt = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10), +nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)) // column cbool has only 2 distinct values val attrBool = AttributeReference("cbool", BooleanType)() - val colStatBool = ColumnStat(distinctCount = 2, min = Some(false), max = Some(true), -nullCount = 0, avgLen = 1, maxLen = 1) + val colStatBool = ColumnStat(distinctCount = Some(2), min = Some(false), max = Some(true), +nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)) // column cdate has 10 values from 2017-01-01 through 2017-01-10. val dMin = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-01")) val dMax = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-10")) val attrDate = AttributeReference("cdate", DateType)() - val colStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = Some(dMax), -nullCount = 0, avgLen = 4, maxLen = 4) + val colStatDate = ColumnStat(distinctCount = Some(10), +min = Some(dMin), max = Some(dMax), +nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)) // column cdecimal has 4 values from 0.20 through 0.80 at increment of 0.20. val decMin = Decimal("0.20") val decMax = Decimal("0.80") val attrDecimal = AttributeReference("cdecimal", DecimalType(18, 18))() - val colStatDecimal = ColumnStat(distinctCount = 4, min = Some(decMin), max = Some(decMax), -nullCount = 0, avgLen = 8, maxLen = 8) + val colStatDecimal = ColumnStat(distinctCount = Some(4), +min = Some(decMin), max = Some(decMax), +nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)) // column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0 val attrDouble = AttributeReference("cdouble", DoubleType)() - val colStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), max = Some(10.0), -nullCount = 0, avgLen = 8, maxLen = 8) + val colStatDouble = ColumnStat(distinctCount = Some(10), min = Some(1.0), max = Some(10.0), +nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)) // column cstring has 10 String values: // "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9" val attrString = AttributeReference("cstring", StringType)() - val colStatString = ColumnStat(distinctCount = 10, min = None, max = None, -nullCount = 0, avgLen = 2, maxLen = 2) + val colStatString = ColumnStat(distinctCount = Some(10), min = None, max = None, +nullCount = Some(0), avgLen = Some(2), maxLen = Some(2)) // column cint2 has values: 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 // Hence, distinctCount:10, min:7, max:16, nullCount:0, avgLen:4, maxLen:4 // This column is created to test "cint < cint2 val attrInt2 = AttributeReference("cint2", IntegerType)() - val colStatInt2 = ColumnStat(distinctCount = 10, min = Some(7), max = Some(16), -nullCount = 0, avgLen = 4, maxLen = 4) + val colStatInt2 = ColumnStat(distinctCount = Some(10), min = Some(7), max = Some(16), +nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)) // column cint3 has values: 30, 31, 32, 33, 34, 35, 36, 37, 38, 39 // Hence, distinctCount:10, min:30, max:39, nullCount:0, avgLen:4, maxLen:4 // This column is created to test "cint = cint3 without overlap at all. val attrInt3 = AttributeReference("cint3", IntegerType)() - val colStatInt3 = ColumnStat(distinctCount = 10, min = Some(30), max = Some(39), -nullCount = 0, avgLen = 4, maxLen = 4) + val colStatInt3 = ColumnStat(distinctCount = Some(10), min = Some(30), max = Some(39), +nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)) // column cint4 has values in the range from 1 to 10 // distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4 // This column is created to test complete overlap val attrInt4 = AttributeReferen
svn commit: r25288 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_26_14_01-6eee545-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Feb 26 22:15:38 2018 New Revision: 25288 Log: Apache Spark 2.3.1-SNAPSHOT-2018_02_26_14_01-6eee545 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25286 - in /dev/spark/2.4.0-SNAPSHOT-2018_02_26_12_01-7ec8365-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Feb 26 20:15:58 2018 New Revision: 25286 Log: Apache Spark 2.4.0-SNAPSHOT-2018_02_26_12_01-7ec8365 docs [This commit notification would consist of 1444 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring
Repository: spark Updated Branches: refs/heads/master 185f5bc7d -> 7ec83658f [SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring ## What changes were proposed in this pull request? Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without. ## How was this patch tested? existing tests Author: Jose Torres Closes #20622 from jose-torres/SPARK-23441. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ec83658 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ec83658 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ec83658 Branch: refs/heads/master Commit: 7ec83658fbc88505dfc2d8a6f76e90db747f1292 Parents: 185f5bc Author: Jose Torres Authored: Mon Feb 26 11:28:44 2018 -0800 Committer: Tathagata Das Committed: Mon Feb 26 11:28:44 2018 -0800 -- .../streaming/continuous/ContinuousExecution.scala| 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ec83658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 2c1d6c5..daebd1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -236,9 +236,7 @@ class ContinuousExecution( startTrigger() if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) { - stopSources() if (queryExecutionThread.isAlive) { -sparkSession.sparkContext.cancelJobGroup(runId.toString) queryExecutionThread.interrupt() } false @@ -266,12 +264,20 @@ class ContinuousExecution( SQLExecution.withNewExecutionId( sparkSessionForQuery, lastExecution)(lastExecution.toRdd) } +} catch { + case t: Throwable + if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => +logInfo(s"Query $id ignoring exception from reconfiguring: $t") +// interrupted by reconfiguration - swallow exception so we can restart the query } finally { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) SparkEnv.get.rpcEnv.stop(epochEndpoint) epochUpdateThread.interrupt() epochUpdateThread.join() + + stopSources() + sparkSession.sparkContext.cancelJobGroup(runId.toString) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23449][K8S] Preserve extraJavaOptions ordering
Repository: spark Updated Branches: refs/heads/branch-2.3 1f180cd12 -> 6eee545f9 [SPARK-23449][K8S] Preserve extraJavaOptions ordering For some JVM options, like `-XX:+UnlockExperimentalVMOptions` ordering is necessary. ## What changes were proposed in this pull request? Keep original `extraJavaOptions` ordering, when passing them through environment variables inside the Docker container. ## How was this patch tested? Ran base branch a couple of times and checked startup command in logs. Ordering differed every time. Added sorting, ordering was consistent to what user had in `extraJavaOptions`. Author: Andrew Korzhuev Closes #20628 from andrusha/patch-2. (cherry picked from commit 185f5bc7dd52cebe8fac9393ecb2bd0968bc5867) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6eee545f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6eee545f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6eee545f Branch: refs/heads/branch-2.3 Commit: 6eee545f9ad4552057bb51daa866d68b08f27c0f Parents: 1f180cd Author: Andrew Korzhuev Authored: Mon Feb 26 10:28:45 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Feb 26 10:29:01 2018 -0800 -- .../kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6eee545f/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh -- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index b9090dc..3d67b0a 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -41,7 +41,7 @@ fi shift 1 SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*" -env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt +env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt readarray -t SPARK_JAVA_OPTS < /tmp/java_opts.txt if [ -n "$SPARK_MOUNTED_CLASSPATH" ]; then SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_MOUNTED_CLASSPATH" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23449][K8S] Preserve extraJavaOptions ordering
Repository: spark Updated Branches: refs/heads/master b308182f2 -> 185f5bc7d [SPARK-23449][K8S] Preserve extraJavaOptions ordering For some JVM options, like `-XX:+UnlockExperimentalVMOptions` ordering is necessary. ## What changes were proposed in this pull request? Keep original `extraJavaOptions` ordering, when passing them through environment variables inside the Docker container. ## How was this patch tested? Ran base branch a couple of times and checked startup command in logs. Ordering differed every time. Added sorting, ordering was consistent to what user had in `extraJavaOptions`. Author: Andrew Korzhuev Closes #20628 from andrusha/patch-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/185f5bc7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/185f5bc7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/185f5bc7 Branch: refs/heads/master Commit: 185f5bc7dd52cebe8fac9393ecb2bd0968bc5867 Parents: b308182 Author: Andrew Korzhuev Authored: Mon Feb 26 10:28:45 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Feb 26 10:28:45 2018 -0800 -- .../kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/185f5bc7/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh -- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index b9090dc..3d67b0a 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -41,7 +41,7 @@ fi shift 1 SPARK_CLASSPATH="$SPARK_CLASSPATH:${SPARK_HOME}/jars/*" -env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt +env | grep SPARK_JAVA_OPT_ | sort -t_ -k4 -n | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt readarray -t SPARK_JAVA_OPTS < /tmp/java_opts.txt if [ -n "$SPARK_MOUNTED_CLASSPATH" ]; then SPARK_CLASSPATH="$SPARK_CLASSPATH:$SPARK_MOUNTED_CLASSPATH" - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r25283 - in /dev/spark/2.3.1-SNAPSHOT-2018_02_26_10_01-1f180cd-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Mon Feb 26 18:16:06 2018 New Revision: 25283 Log: Apache Spark 2.3.1-SNAPSHOT-2018_02_26_10_01-1f180cd docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes
Repository: spark Updated Branches: refs/heads/branch-2.0 076c2f6a1 -> d51c6aaeb [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes There is a race condition introduced in SPARK-11141 which could cause data loss. The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue. In this PR only the allocated blocks will be removed from the queue which will prevent data loss. Additional unit test + manually. Author: Gabor Somogyi Closes #20620 from gaborgsomogyi/SPARK-23438. (cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d51c6aae Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d51c6aae Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d51c6aae Branch: refs/heads/branch-2.0 Commit: d51c6aaeb7ff7e1ea74f0c282b76d20300c1dcae Parents: 076c2f6 Author: Gabor Somogyi Authored: Mon Feb 26 08:39:44 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Feb 26 08:57:45 2018 -0800 -- .../scheduler/ReceivedBlockTracker.scala| 11 ++ .../streaming/ReceivedBlockTrackerSuite.scala | 23 +++- 2 files changed, 29 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d51c6aae/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5d9a8ac..dacff69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } -// Insert the recovered block-to-batch allocations and clear the queue of received blocks -// (when the blocks were originally allocated to the batch, the queue must have been cleared). +// Insert the recovered block-to-batch allocations and removes them from queue of +// received blocks. def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) { logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " + s"${allocatedBlocks.streamIdToAllocatedBlocks}") - streamIdToUnallocatedBlockQueues.values.foreach { _.clear() } + allocatedBlocks.streamIdToAllocatedBlocks.foreach { +case (streamId, allocatedBlocksInStream) => + getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet) + } timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } @@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker( } /** Write an update to the tracker to the write ahead log */ - private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { + private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { http://git-wip-us.apache.org/repos/asf/spark/blob/d51c6aae/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 851013b..d16e2bf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult -import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _} import org.apache.spark.streaming.util._ import org.apache.spark.streaming.util.WriteAheadLogSuite._ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } + test("recovery with write ahead logs should rem
spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes
Repository: spark Updated Branches: refs/heads/branch-2.1 24fe6eb0f -> 2d751fbf6 [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes There is a race condition introduced in SPARK-11141 which could cause data loss. The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue. In this PR only the allocated blocks will be removed from the queue which will prevent data loss. Additional unit test + manually. Author: Gabor Somogyi Closes #20620 from gaborgsomogyi/SPARK-23438. (cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d751fbf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d751fbf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d751fbf Branch: refs/heads/branch-2.1 Commit: 2d751fbf6e50ad5d470f4d00b7f254bd1d398270 Parents: 24fe6eb Author: Gabor Somogyi Authored: Mon Feb 26 08:39:44 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Feb 26 08:57:21 2018 -0800 -- .../scheduler/ReceivedBlockTracker.scala| 11 ++ .../streaming/ReceivedBlockTrackerSuite.scala | 23 +++- 2 files changed, 29 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2d751fbf/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5d9a8ac..dacff69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } -// Insert the recovered block-to-batch allocations and clear the queue of received blocks -// (when the blocks were originally allocated to the batch, the queue must have been cleared). +// Insert the recovered block-to-batch allocations and removes them from queue of +// received blocks. def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) { logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " + s"${allocatedBlocks.streamIdToAllocatedBlocks}") - streamIdToUnallocatedBlockQueues.values.foreach { _.clear() } + allocatedBlocks.streamIdToAllocatedBlocks.foreach { +case (streamId, allocatedBlocksInStream) => + getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet) + } timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } @@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker( } /** Write an update to the tracker to the write ahead log */ - private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { + private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { http://git-wip-us.apache.org/repos/asf/spark/blob/2d751fbf/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 851013b..d16e2bf 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult -import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _} import org.apache.spark.streaming.util._ import org.apache.spark.streaming.util.WriteAheadLogSuite._ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } + test("recovery with write ahead logs should rem
spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes
Repository: spark Updated Branches: refs/heads/branch-2.2 1cc34f3e5 -> fa3667ece [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes There is a race condition introduced in SPARK-11141 which could cause data loss. The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue. In this PR only the allocated blocks will be removed from the queue which will prevent data loss. Additional unit test + manually. Author: Gabor Somogyi Closes #20620 from gaborgsomogyi/SPARK-23438. (cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa3667ec Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa3667ec Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa3667ec Branch: refs/heads/branch-2.2 Commit: fa3667ecef0bcdf8aab3242904238e3ba28d2705 Parents: 1cc34f3 Author: Gabor Somogyi Authored: Mon Feb 26 08:39:44 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Feb 26 08:56:57 2018 -0800 -- .../scheduler/ReceivedBlockTracker.scala| 11 ++ .../streaming/ReceivedBlockTrackerSuite.scala | 23 +++- 2 files changed, 29 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fa3667ec/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5d9a8ac..dacff69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } -// Insert the recovered block-to-batch allocations and clear the queue of received blocks -// (when the blocks were originally allocated to the batch, the queue must have been cleared). +// Insert the recovered block-to-batch allocations and removes them from queue of +// received blocks. def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) { logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " + s"${allocatedBlocks.streamIdToAllocatedBlocks}") - streamIdToUnallocatedBlockQueues.values.foreach { _.clear() } + allocatedBlocks.streamIdToAllocatedBlocks.foreach { +case (streamId, allocatedBlocksInStream) => + getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet) + } timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } @@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker( } /** Write an update to the tracker to the write ahead log */ - private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { + private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { http://git-wip-us.apache.org/repos/asf/spark/blob/fa3667ec/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 107c3f5..4fa236b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult -import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _} import org.apache.spark.streaming.util._ import org.apache.spark.streaming.util.WriteAheadLogSuite._ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } + test("recovery with write ahead logs should rem
spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes
Repository: spark Updated Branches: refs/heads/branch-2.3 578607b30 -> 1f180cd12 [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes There is a race condition introduced in SPARK-11141 which could cause data loss. The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue. In this PR only the allocated blocks will be removed from the queue which will prevent data loss. Additional unit test + manually. Author: Gabor Somogyi Closes #20620 from gaborgsomogyi/SPARK-23438. (cherry picked from commit b308182f233b8840dfe0e6b5736d2f2746f40757) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1f180cd1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1f180cd1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1f180cd1 Branch: refs/heads/branch-2.3 Commit: 1f180cd121b13ecd455bee55ed2224936a2f3b2a Parents: 578607b Author: Gabor Somogyi Authored: Mon Feb 26 08:39:44 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Feb 26 08:55:57 2018 -0800 -- .../scheduler/ReceivedBlockTracker.scala| 11 ++ .../streaming/ReceivedBlockTrackerSuite.scala | 23 +++- 2 files changed, 29 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1f180cd1/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5d9a8ac..dacff69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } -// Insert the recovered block-to-batch allocations and clear the queue of received blocks -// (when the blocks were originally allocated to the batch, the queue must have been cleared). +// Insert the recovered block-to-batch allocations and removes them from queue of +// received blocks. def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) { logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " + s"${allocatedBlocks.streamIdToAllocatedBlocks}") - streamIdToUnallocatedBlockQueues.values.foreach { _.clear() } + allocatedBlocks.streamIdToAllocatedBlocks.foreach { +case (streamId, allocatedBlocksInStream) => + getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet) + } timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } @@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker( } /** Write an update to the tracker to the write ahead log */ - private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { + private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { http://git-wip-us.apache.org/repos/asf/spark/blob/1f180cd1/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 107c3f5..4fa236b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult -import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _} import org.apache.spark.streaming.util._ import org.apache.spark.streaming.util.WriteAheadLogSuite._ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } + test("recovery with write ahead logs should rem
spark git commit: [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes
Repository: spark Updated Branches: refs/heads/master 3ca9a2c56 -> b308182f2 [SPARK-23438][DSTREAMS] Fix DStreams data loss with WAL when driver crashes ## What changes were proposed in this pull request? There is a race condition introduced in SPARK-11141 which could cause data loss. The problem is that ReceivedBlockTracker.insertAllocatedBatch function assumes that all blocks from streamIdToUnallocatedBlockQueues allocated to the batch and clears the queue. In this PR only the allocated blocks will be removed from the queue which will prevent data loss. ## How was this patch tested? Additional unit test + manually. Author: Gabor Somogyi Closes #20620 from gaborgsomogyi/SPARK-23438. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b308182f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b308182f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b308182f Branch: refs/heads/master Commit: b308182f233b8840dfe0e6b5736d2f2746f40757 Parents: 3ca9a2c Author: Gabor Somogyi Authored: Mon Feb 26 08:39:44 2018 -0800 Committer: Marcelo Vanzin Committed: Mon Feb 26 08:39:44 2018 -0800 -- .../scheduler/ReceivedBlockTracker.scala| 11 ++ .../streaming/ReceivedBlockTrackerSuite.scala | 23 +++- 2 files changed, 29 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b308182f/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala -- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala index 5d9a8ac..dacff69 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala @@ -193,12 +193,15 @@ private[streaming] class ReceivedBlockTracker( getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo } -// Insert the recovered block-to-batch allocations and clear the queue of received blocks -// (when the blocks were originally allocated to the batch, the queue must have been cleared). +// Insert the recovered block-to-batch allocations and removes them from queue of +// received blocks. def insertAllocatedBatch(batchTime: Time, allocatedBlocks: AllocatedBlocks) { logTrace(s"Recovery: Inserting allocated batch for time $batchTime to " + s"${allocatedBlocks.streamIdToAllocatedBlocks}") - streamIdToUnallocatedBlockQueues.values.foreach { _.clear() } + allocatedBlocks.streamIdToAllocatedBlocks.foreach { +case (streamId, allocatedBlocksInStream) => + getReceivedBlockQueue(streamId).dequeueAll(allocatedBlocksInStream.toSet) + } timeToAllocatedBlocks.put(batchTime, allocatedBlocks) lastAllocatedBatchTime = batchTime } @@ -227,7 +230,7 @@ private[streaming] class ReceivedBlockTracker( } /** Write an update to the tracker to the write ahead log */ - private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { + private[streaming] def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { if (isWriteAheadLogEnabled) { logTrace(s"Writing record: $record") try { http://git-wip-us.apache.org/repos/asf/spark/blob/b308182f/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala -- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 107c3f5..4fa236b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult -import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.{AllocatedBlocks, _} import org.apache.spark.streaming.util._ import org.apache.spark.streaming.util.WriteAheadLogSuite._ import org.apache.spark.util.{Clock, ManualClock, SystemClock, Utils} @@ -94,6 +94,27 @@ class ReceivedBlockTrackerSuite receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos } + test("recovery with write ahead logs should remove only allocated blocks