svn commit: r26710 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_04_20_01-47b5b68-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Sat May 5 03:15:34 2018 New Revision: 26710 Log: Apache Spark 2.4.0-SNAPSHOT-2018_05_04_20_01-47b5b68 docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication.
Repository: spark Updated Branches: refs/heads/master af4dc5028 -> 47b5b6852 [SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication. ## What changes were proposed in this pull request? This PR enables the MicroBatchExecution to run no-data batches if some SparkPlan requires running another batch to output results based on updated watermark / processing time. In this PR, I have enabled streaming aggregations and streaming deduplicates to automatically run addition batch even if new data is available. See https://issues.apache.org/jira/browse/SPARK-24156 for more context. Major changes/refactoring done in this PR. - Refactoring MicroBatchExecution - A major point of confusion in MicroBatchExecution control flow was always (at least to me) was that `populateStartOffsets` internally called `constructNextBatch` which was not obvious from just the name "populateStartOffsets" and made the control flow from the main trigger execution loop very confusing (main loop in `runActivatedStream` called `constructNextBatch` but only if `populateStartOffsets` hadn't already called it). Instead, the refactoring makes it cleaner. - `populateStartOffsets` only the updates `availableOffsets` and `committedOffsets`. Does not call `constructNextBatch`. - Main loop in `runActivatedStream` calls `constructNextBatch` which returns true or false reflecting whether the next batch is ready for executing. This method is now idempotent; if a batch has already been constructed, then it will always return true until the batch has been executed. - If next batch is ready then we call `runBatch` or sleep. - That's it. - Refactoring watermark management logic - This has been refactored out from `MicroBatchExecution` in a separate class to simplify `MicroBatchExecution`. - New method `shouldRunAnotherBatch` in `IncrementalExecution` - This returns true if there is any stateful operation in the last execution plan that requires another batch for state cleanup, etc. This is used to decide whether to construct a batch or not in `constructNextBatch`. - Changes to stream testing framework - Many tests used CheckLastBatch to validate answers. This assumed that there will be no more batches after the last set of input has been processed, so the last batch is the one that has output corresponding to the last input. This is not true anymore. To account for that, I made two changes. - `CheckNewAnswer` is a new test action that verifies the new rows generated since the last time the answer was checked by `CheckAnswer`, `CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches occurred between the last check and now. To do make this easier, I added a common trait between MemorySink and MemorySinkV2 to abstract out some common methods. - `assertNumStateRows` has been updated in the same way to be agnostic to batches while checking what the total rows and how many state rows were updated (sums up updates since the last check). ## How was this patch tested? - Changes made to existing tests - Tests have been changed in one of the following patterns. - Tests where the last input was given again to force another batch to be executed and state cleaned up / output generated, they were simplified by removing the extra input. - Tests using aggregation+watermark where CheckLastBatch were replaced with CheckNewAnswer to make them batch agnostic. - New tests added to check whether the flag works for streaming aggregation and deduplication Author: Tathagata DasCloses #21220 from tdas/SPARK-24157. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/47b5b685 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/47b5b685 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/47b5b685 Branch: refs/heads/master Commit: 47b5b68528c154d32b3f40f388918836d29462b8 Parents: af4dc50 Author: Tathagata Das Authored: Fri May 4 16:35:24 2018 -0700 Committer: Tathagata Das Committed: Fri May 4 16:35:24 2018 -0700 -- .../org/apache/spark/sql/internal/SQLConf.scala | 11 + .../streaming/IncrementalExecution.scala| 10 + .../streaming/MicroBatchExecution.scala | 231 +++ .../execution/streaming/WatermarkTracker.scala | 73 + .../spark/sql/execution/streaming/memory.scala | 17 +- .../execution/streaming/sources/memoryV2.scala | 8 +- .../execution/streaming/statefulOperators.scala | 16 + .../streaming/sources/ForeachWriterSuite.scala | 8 +- .../spark/sql/streaming/DeduplicateSuite.scala | 285 -- .../sql/streaming/EventTimeWatermarkSuite.scala | 112 +++ .../sql/streaming/FileStreamSinkSuite.scala | 7 +-
spark git commit: [SPARK-24039][SS] Do continuous processing writes with multiple compute() calls
Repository: spark Updated Branches: refs/heads/master d04806a23 -> af4dc5028 [SPARK-24039][SS] Do continuous processing writes with multiple compute() calls ## What changes were proposed in this pull request? Do continuous processing writes with multiple compute() calls. The current strategy (before this PR) is hacky; we just call next() on an iterator which has already returned hasNext = false, knowing that all the nodes we whitelist handle this properly. This will have to be changed before we can support more complex query plans. (In particular, I have a WIP https://github.com/jose-torres/spark/pull/13 which should be able to support aggregates in a single partition with minimal additional work.) Most of the changes here are just refactoring to accommodate the new model. The behavioral changes are: * The writer now calls prev.compute(split, context) once per epoch within the epoch loop. * ContinuousDataSourceRDD now spawns a ContinuousQueuedDataReader which is shared across multiple calls to compute() for the same partition. ## How was this patch tested? existing unit tests Author: Jose TorresCloses #21200 from jose-torres/noAggr. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af4dc502 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af4dc502 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af4dc502 Branch: refs/heads/master Commit: af4dc50280ffcdeda208ef2dc5f8b843389732e5 Parents: d04806a Author: Jose Torres Authored: Fri May 4 14:14:40 2018 -0700 Committer: Tathagata Das Committed: Fri May 4 14:14:40 2018 -0700 -- .../datasources/v2/DataSourceV2ScanExec.scala | 6 +- .../continuous/ContinuousDataSourceRDD.scala| 114 ++ .../ContinuousDataSourceRDDIter.scala | 222 --- .../continuous/ContinuousQueuedDataReader.scala | 211 ++ .../continuous/ContinuousWriteRDD.scala | 90 .../WriteToContinuousDataSourceExec.scala | 57 + .../ContinuousQueuedDataReaderSuite.scala | 167 ++ 7 files changed, 592 insertions(+), 275 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/af4dc502/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 41bdda4..77cb707 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -96,7 +96,11 @@ case class DataSourceV2ScanExec( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(readerFactories.size)) - new ContinuousDataSourceRDD(sparkContext, sqlContext, readerFactories) + new ContinuousDataSourceRDD( +sparkContext, +sqlContext.conf.continuousStreamingExecutorQueueSize, +sqlContext.conf.continuousStreamingExecutorPollIntervalMs, +readerFactories) .asInstanceOf[RDD[InternalRow]] case r: SupportsScanColumnarBatch if r.enableBatchRead() => http://git-wip-us.apache.org/repos/asf/spark/blob/af4dc502/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala new file mode 100644 index 000..0a3b9dc --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousDataSourceRDD.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed
spark git commit: [SPARK-24124] Spark history server should create spark.history.store.…
Repository: spark Updated Branches: refs/heads/master 4d5de4d30 -> d04806a23 [SPARK-24124] Spark history server should create spark.history.store.⦠â¦path and set permissions properly ## What changes were proposed in this pull request? Spark history server should create spark.history.store.path and set permissions properly. Note createdDirectories doesn't do anything if the directories are already created. This does not stomp on the permissions if the user had manually created the directory before the history server could. ## How was this patch tested? Manually tested in a 100 node cluster. Ensured directories created with proper permissions. Ensured restarted worked apps/temp directories worked as apps were read. Author: Thomas GravesCloses #21234 from tgravescs/SPARK-24124. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d04806a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d04806a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d04806a2 Branch: refs/heads/master Commit: d04806a23c1843a7f0dcc4fa236ed1b40ae113a5 Parents: 4d5de4d Author: Thomas Graves Authored: Fri May 4 13:29:47 2018 -0700 Committer: Marcelo Vanzin Committed: Fri May 4 13:29:47 2018 -0700 -- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d04806a2/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 56db935..bf1eeb0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,6 +18,8 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} +import java.nio.file.Files +import java.nio.file.attribute.PosixFilePermissions import java.util.{Date, ServiceLoader} import java.util.concurrent.{ExecutorService, TimeUnit} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -130,8 +132,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Visible for testing. private[history] val listing: KVStore = storePath.map { path => -require(path.isDirectory(), s"Configured store directory ($path) does not exist.") -val dbPath = new File(path, "listing.ldb") +val perms = PosixFilePermissions.fromString("rwx--") +val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath(), + PosixFilePermissions.asFileAttribute(perms)).toFile() + val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, AppStatusStore.CURRENT_VERSION, logDir.toString()) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26705 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_04_08_01-4d5de4d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri May 4 15:16:17 2018 New Revision: 26705 Log: Apache Spark 2.4.0-SNAPSHOT-2018_05_04_08_01-4d5de4d docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26702 - in /dev/spark/2.3.1-SNAPSHOT-2018_05_04_06_01-3f78f60-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri May 4 13:16:16 2018 New Revision: 26702 Log: Apache Spark 2.3.1-SNAPSHOT-2018_05_04_06_01-3f78f60 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly
Repository: spark Updated Branches: refs/heads/branch-2.0 d51c6aaeb -> a42dd008c [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly ## What changes were proposed in this pull request? It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode. This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`. ## How was this patch tested? a new test Author: Wenchen FanCloses #21229 from cloud-fan/accumulator. (cherry picked from commit 4d5de4d303a773b1c18c350072344bd7efca9fc4) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a42dd008 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a42dd008 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a42dd008 Branch: refs/heads/branch-2.0 Commit: a42dd008c7770c48e62e6c1ad12f191c65c0cbd7 Parents: d51c6aa Author: Wenchen Fan Authored: Fri May 4 19:20:15 2018 +0800 Committer: Wenchen Fan Committed: Fri May 4 19:22:20 2018 +0800 -- .../org/apache/spark/util/AccumulatorV2.scala| 6 -- .../apache/spark/util/AccumulatorV2Suite.scala | 19 +++ 2 files changed, 23 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a42dd008/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index d06ab3d..155ea6c 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -476,7 +476,9 @@ class LegacyAccumulatorWrapper[R, T]( param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { private[spark] var _value = initialValue // Current value on driver - override def isZero: Boolean = _value == param.zero(initialValue) + @transient private lazy val _zero = param.zero(initialValue) + + override def isZero: Boolean = _value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef]) override def copy(): LegacyAccumulatorWrapper[R, T] = { val acc = new LegacyAccumulatorWrapper(initialValue, param) @@ -485,7 +487,7 @@ class LegacyAccumulatorWrapper[R, T]( } override def reset(): Unit = { -_value = param.zero(initialValue) +_value = _zero } override def add(v: T): Unit = _value = param.addAccumulator(_value, v) http://git-wip-us.apache.org/repos/asf/spark/blob/a42dd008/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index a04644d..fe0a9a4 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import org.apache.spark._ +import org.apache.spark.serializer.JavaSerializer class AccumulatorV2Suite extends SparkFunSuite { @@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite { assert(acc3.isZero) assert(acc3.value === "") } + + test("LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode") { +class MyData(val i: Int) extends Serializable +val param = new AccumulatorParam[MyData] { + override def zero(initialValue: MyData): MyData = new MyData(0) + override def addInPlace(r1: MyData, r2: MyData): MyData = new MyData(r1.i + r2.i) +} + +val acc = new LegacyAccumulatorWrapper(new MyData(0), param) +acc.metadata = AccumulatorMetadata( + AccumulatorContext.newId(), + Some("test"), + countFailedValues = false) +AccumulatorContext.register(acc) + +val ser = new JavaSerializer(new SparkConf).newInstance() +ser.serialize(acc) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly
Repository: spark Updated Branches: refs/heads/branch-2.2 768d0b7ce -> 866270ea5 [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly ## What changes were proposed in this pull request? It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode. This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`. ## How was this patch tested? a new test Author: Wenchen FanCloses #21229 from cloud-fan/accumulator. (cherry picked from commit 4d5de4d303a773b1c18c350072344bd7efca9fc4) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/866270ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/866270ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/866270ea Branch: refs/heads/branch-2.2 Commit: 866270ea5bbf72328c1f28c6bf49e39fb3e69d12 Parents: 768d0b7 Author: Wenchen Fan Authored: Fri May 4 19:20:15 2018 +0800 Committer: Wenchen Fan Committed: Fri May 4 19:21:16 2018 +0800 -- .../org/apache/spark/util/AccumulatorV2.scala| 6 -- .../apache/spark/util/AccumulatorV2Suite.scala | 19 +++ 2 files changed, 23 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/866270ea/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 603c23a..5df17cc 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -482,7 +482,9 @@ class LegacyAccumulatorWrapper[R, T]( param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { private[spark] var _value = initialValue // Current value on driver - override def isZero: Boolean = _value == param.zero(initialValue) + @transient private lazy val _zero = param.zero(initialValue) + + override def isZero: Boolean = _value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef]) override def copy(): LegacyAccumulatorWrapper[R, T] = { val acc = new LegacyAccumulatorWrapper(initialValue, param) @@ -491,7 +493,7 @@ class LegacyAccumulatorWrapper[R, T]( } override def reset(): Unit = { -_value = param.zero(initialValue) +_value = _zero } override def add(v: T): Unit = _value = param.addAccumulator(_value, v) http://git-wip-us.apache.org/repos/asf/spark/blob/866270ea/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index a04644d..fe0a9a4 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import org.apache.spark._ +import org.apache.spark.serializer.JavaSerializer class AccumulatorV2Suite extends SparkFunSuite { @@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite { assert(acc3.isZero) assert(acc3.value === "") } + + test("LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode") { +class MyData(val i: Int) extends Serializable +val param = new AccumulatorParam[MyData] { + override def zero(initialValue: MyData): MyData = new MyData(0) + override def addInPlace(r1: MyData, r2: MyData): MyData = new MyData(r1.i + r2.i) +} + +val acc = new LegacyAccumulatorWrapper(new MyData(0), param) +acc.metadata = AccumulatorMetadata( + AccumulatorContext.newId(), + Some("test"), + countFailedValues = false) +AccumulatorContext.register(acc) + +val ser = new JavaSerializer(new SparkConf).newInstance() +ser.serialize(acc) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly
Repository: spark Updated Branches: refs/heads/branch-2.3 d35eb2f9b -> 3f78f60cc [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly ## What changes were proposed in this pull request? It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode. This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`. ## How was this patch tested? a new test Author: Wenchen FanCloses #21229 from cloud-fan/accumulator. (cherry picked from commit 4d5de4d303a773b1c18c350072344bd7efca9fc4) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f78f60c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f78f60c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f78f60c Branch: refs/heads/branch-2.3 Commit: 3f78f60cc80d04ce762bd332dd2fa5f7d17c2b3c Parents: d35eb2f Author: Wenchen Fan Authored: Fri May 4 19:20:15 2018 +0800 Committer: Wenchen Fan Committed: Fri May 4 19:20:39 2018 +0800 -- .../org/apache/spark/util/AccumulatorV2.scala| 6 -- .../apache/spark/util/AccumulatorV2Suite.scala | 19 +++ 2 files changed, 23 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3f78f60c/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 0f84ea9..2bc8495 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -486,7 +486,9 @@ class LegacyAccumulatorWrapper[R, T]( param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { private[spark] var _value = initialValue // Current value on driver - override def isZero: Boolean = _value == param.zero(initialValue) + @transient private lazy val _zero = param.zero(initialValue) + + override def isZero: Boolean = _value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef]) override def copy(): LegacyAccumulatorWrapper[R, T] = { val acc = new LegacyAccumulatorWrapper(initialValue, param) @@ -495,7 +497,7 @@ class LegacyAccumulatorWrapper[R, T]( } override def reset(): Unit = { -_value = param.zero(initialValue) +_value = _zero } override def add(v: T): Unit = _value = param.addAccumulator(_value, v) http://git-wip-us.apache.org/repos/asf/spark/blob/3f78f60c/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index a04644d..fe0a9a4 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import org.apache.spark._ +import org.apache.spark.serializer.JavaSerializer class AccumulatorV2Suite extends SparkFunSuite { @@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite { assert(acc3.isZero) assert(acc3.value === "") } + + test("LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode") { +class MyData(val i: Int) extends Serializable +val param = new AccumulatorParam[MyData] { + override def zero(initialValue: MyData): MyData = new MyData(0) + override def addInPlace(r1: MyData, r2: MyData): MyData = new MyData(r1.i + r2.i) +} + +val acc = new LegacyAccumulatorWrapper(new MyData(0), param) +acc.metadata = AccumulatorMetadata( + AccumulatorContext.newId(), + Some("test"), + countFailedValues = false) +AccumulatorContext.register(acc) + +val ser = new JavaSerializer(new SparkConf).newInstance() +ser.serialize(acc) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly
Repository: spark Updated Branches: refs/heads/master 7f1b6b182 -> 4d5de4d30 [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly ## What changes were proposed in this pull request? It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode. This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`. ## How was this patch tested? a new test Author: Wenchen FanCloses #21229 from cloud-fan/accumulator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d5de4d3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d5de4d3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d5de4d3 Branch: refs/heads/master Commit: 4d5de4d303a773b1c18c350072344bd7efca9fc4 Parents: 7f1b6b1 Author: Wenchen Fan Authored: Fri May 4 19:20:15 2018 +0800 Committer: Wenchen Fan Committed: Fri May 4 19:20:15 2018 +0800 -- .../org/apache/spark/util/AccumulatorV2.scala| 6 -- .../apache/spark/util/AccumulatorV2Suite.scala | 19 +++ 2 files changed, 23 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4d5de4d3/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala index 0f84ea9..2bc8495 100644 --- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala +++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala @@ -486,7 +486,9 @@ class LegacyAccumulatorWrapper[R, T]( param: org.apache.spark.AccumulableParam[R, T]) extends AccumulatorV2[T, R] { private[spark] var _value = initialValue // Current value on driver - override def isZero: Boolean = _value == param.zero(initialValue) + @transient private lazy val _zero = param.zero(initialValue) + + override def isZero: Boolean = _value.asInstanceOf[AnyRef].eq(_zero.asInstanceOf[AnyRef]) override def copy(): LegacyAccumulatorWrapper[R, T] = { val acc = new LegacyAccumulatorWrapper(initialValue, param) @@ -495,7 +497,7 @@ class LegacyAccumulatorWrapper[R, T]( } override def reset(): Unit = { -_value = param.zero(initialValue) +_value = _zero } override def add(v: T): Unit = _value = param.addAccumulator(_value, v) http://git-wip-us.apache.org/repos/asf/spark/blob/4d5de4d3/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala index a04644d..fe0a9a4 100644 --- a/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala +++ b/core/src/test/scala/org/apache/spark/util/AccumulatorV2Suite.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import org.apache.spark._ +import org.apache.spark.serializer.JavaSerializer class AccumulatorV2Suite extends SparkFunSuite { @@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite { assert(acc3.isZero) assert(acc3.value === "") } + + test("LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode") { +class MyData(val i: Int) extends Serializable +val param = new AccumulatorParam[MyData] { + override def zero(initialValue: MyData): MyData = new MyData(0) + override def addInPlace(r1: MyData, r2: MyData): MyData = new MyData(r1.i + r2.i) +} + +val acc = new LegacyAccumulatorWrapper(new MyData(0), param) +acc.metadata = AccumulatorMetadata( + AccumulatorContext.newId(), + Some("test"), + countFailedValues = false) +AccumulatorContext.register(acc) + +val ser = new JavaSerializer(new SparkConf).newInstance() +ser.serialize(acc) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r26700 - in /dev/spark/2.4.0-SNAPSHOT-2018_05_04_04_01-7f1b6b1-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Fri May 4 11:18:58 2018 New Revision: 26700 Log: Apache Spark 2.4.0-SNAPSHOT-2018_05_04_04_01-7f1b6b1 docs [This commit notification would consist of 1460 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if record is available
Repository: spark Updated Branches: refs/heads/master 0c23e254c -> 7f1b6b182 [SPARK-24136][SS] Fix MemoryStreamDataReader.next to skip sleeping if record is available ## What changes were proposed in this pull request? Avoid unnecessary sleep (10 ms) in each invocation of MemoryStreamDataReader.next. ## How was this patch tested? Ran ContinuousSuite from IDE. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Arun MahadevanCloses #21207 from arunmahadevan/memorystream. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f1b6b18 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f1b6b18 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f1b6b18 Branch: refs/heads/master Commit: 7f1b6b182e3cf3cbf29399e7bfbe03fa869e0bc8 Parents: 0c23e25 Author: Arun Mahadevan Authored: Fri May 4 16:02:21 2018 +0800 Committer: jerryshao Committed: Fri May 4 16:02:21 2018 +0800 -- .../streaming/sources/ContinuousMemoryStream.scala | 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f1b6b18/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala index c28919b..a8fca3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ContinuousMemoryStream.scala @@ -183,11 +183,10 @@ class ContinuousMemoryStreamDataReader( private var current: Option[Row] = None override def next(): Boolean = { -current = None +current = getRecord while (current.isEmpty) { Thread.sleep(10) - current = endpoint.askSync[Option[Row]]( - GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset))) + current = getRecord } currentOffset += 1 true @@ -199,6 +198,10 @@ class ContinuousMemoryStreamDataReader( override def getOffset: ContinuousMemoryStreamPartitionOffset = ContinuousMemoryStreamPartitionOffset(partition, currentOffset) + + private def getRecord: Option[Row] = +endpoint.askSync[Option[Row]]( + GetRecord(ContinuousMemoryStreamPartitionOffset(partition, currentOffset))) } case class ContinuousMemoryStreamOffset(partitionNums: Map[Int, Int]) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org