spark git commit: [SPARK-25159][SQL] json schema inference should only trigger one job
Repository: spark Updated Branches: refs/heads/master 07737c87d -> 4a9c9d8f9 [SPARK-25159][SQL] json schema inference should only trigger one job ## What changes were proposed in this pull request? This fixes a perf regression caused by https://github.com/apache/spark/pull/21376 . We should not use `RDD#toLocalIterator`, which triggers one Spark job per RDD partition. This is very bad for RDDs with a lot of small partitions. To fix it, this PR introduces a way to access SQLConf in the scheduler event loop thread, so that we don't need to use `RDD#toLocalIterator` anymore in `JsonInferSchema`. ## How was this patch tested? a new test Closes #22152 from cloud-fan/conf. Authored-by: Wenchen Fan Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a9c9d8f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a9c9d8f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a9c9d8f Branch: refs/heads/master Commit: 4a9c9d8f9a8f8f165369e121d3b553a3515333d4 Parents: 07737c8 Author: Wenchen Fan Authored: Tue Aug 21 22:21:08 2018 -0700 Committer: Xiao Li Committed: Tue Aug 21 22:21:08 2018 -0700 -- .../sql/catalyst/json/JsonInferSchema.scala | 16 +++--- .../org/apache/spark/sql/internal/SQLConf.scala | 33 .../org/apache/spark/sql/DataFrameSuite.scala | 24 ++ 3 files changed, 63 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 5f70e06..a00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { }.reduceOption(typeMerger).toIterator } -// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because -// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have -// active SparkSession and `SQLConf.get` may point to the wrong configs. -val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) +// Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running +// the fold functions in the scheduler event loop thread. +val existingConf = SQLConf.get +var rootType: DataType = StructType(Nil) +val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) +val mergeResult = (index: Int, taskResult: DataType) => { + rootType = SQLConf.withExistingConf(existingConf) { +typeMerger(rootType, taskResult) + } +} +json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult) canonicalizeType(rootType, configOptions) match { case Some(st: StructType) => st http://git-wip-us.apache.org/repos/asf/spark/blob/4a9c9d8f/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 5913c94..df2caff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -82,6 +82,19 @@ object SQLConf { /** See [[get]] for more information. */ def getFallbackConf: SQLConf = fallbackConf.get() + private lazy val existingConf = new ThreadLocal[SQLConf] { +override def initialValue: SQLConf = null + } + + def withExistingConf[T](conf: SQLConf)(f: => T): T = { +existingConf.set(conf) +try { + f +} finally { + existingConf.remove() +} + } + /** * Defines a getter that returns the SQLConf within scope. * See [[get]] for more information. @@ -116,16 +129,24 @@ object SQLConf { if (TaskContext.get != null) { new
spark git commit: [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions when expr codegen fails
Repository: spark Updated Branches: refs/heads/master a998e9d82 -> 07737c87d [SPARK-23711][SPARK-25140][SQL] Catch correct exceptions when expr codegen fails ## What changes were proposed in this pull request? This pr is to fix bugs when expr codegen fails; we need to catch `java.util.concurrent.ExecutionException` instead of `InternalCompilerException` and `CompileException` . This handling is the same with the `WholeStageCodegenExec ` one: https://github.com/apache/spark/blob/60af2501e1afc00192c779f2736a4e3de12428fa/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L585 ## How was this patch tested? Added tests in `CodeGeneratorWithInterpretedFallbackSuite` Closes #22154 from maropu/SPARK-25140. Authored-by: Takeshi Yamamuro Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07737c87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07737c87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07737c87 Branch: refs/heads/master Commit: 07737c87d6086c986785ff0edc43ca94effa4fc6 Parents: a998e9d Author: Takeshi Yamamuro Authored: Tue Aug 21 22:17:44 2018 -0700 Committer: Xiao Li Committed: Tue Aug 21 22:17:44 2018 -0700 -- .../CodeGeneratorWithInterpretedFallback.scala | 23 -- .../sql/catalyst/expressions/Projection.scala | 7 +++- ...eGeneratorWithInterpretedFallbackSuite.scala | 44 ++-- .../sql/execution/WholeStageCodegenExec.scala | 3 +- 4 files changed, 55 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/07737c87/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala index 0f6d866..07fa813 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallback.scala @@ -17,25 +17,13 @@ package org.apache.spark.sql.catalyst.expressions -import org.codehaus.commons.compiler.CompileException -import org.codehaus.janino.InternalCompilerException +import scala.util.control.NonFatal -import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils /** - * Catches compile error during code generation. - */ -object CodegenError { - def unapply(throwable: Throwable): Option[Exception] = throwable match { -case e: InternalCompilerException => Some(e) -case e: CompileException => Some(e) -case _ => None - } -} - -/** * Defines values for `SQLConf` config of fallback mode. Use for test only. */ object CodegenObjectFactoryMode extends Enumeration { @@ -47,7 +35,7 @@ object CodegenObjectFactoryMode extends Enumeration { * error happens, it can fallback to interpreted implementation. In tests, we can use a SQL config * `SQLConf.CODEGEN_FACTORY_MODE` to control fallback behavior. */ -abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] { +abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] extends Logging { def createObject(in: IN): OUT = { // We are allowed to choose codegen-only or no-codegen modes if under tests. @@ -63,7 +51,10 @@ abstract class CodeGeneratorWithInterpretedFallback[IN, OUT] { try { createCodeGeneratedObject(in) } catch { - case CodegenError(_) => createInterpretedObject(in) + case NonFatal(_) => +// We should have already seen the error message in `CodeGenerator` +logWarning("Expr codegen error and falling back to interpreter mode") +createInterpretedObject(in) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/07737c87/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 6493f09..226a4dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions
svn commit: r28889 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_20_02-a998e9d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Aug 22 03:15:59 2018 New Revision: 28889 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_21_20_02-a998e9d docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [MINOR] Added import to fix compilation
Repository: spark Updated Branches: refs/heads/master ad45299d0 -> a998e9d82 [MINOR] Added import to fix compilation ## What changes were proposed in this pull request? Two back to PRs implicitly conflicted by one PR removing an existing import that the other PR needed. This did not cause explicit conflict as the import already existed, but not used. https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.7/8226/consoleFull ``` [info] Compiling 342 Scala sources and 97 Java sources to /home/jenkins/workspace/spark-master-compile-maven-hadoop-2.7/sql/core/target/scala-2.11/classes... [warn] /home/jenkins/workspace/spark-master-compile-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala:128: value ENABLE_JOB_SUMMARY in object ParquetOutputFormat is deprecated: see corresponding Javadoc for more information. [warn] && conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { [warn] ^ [error] /home/jenkins/workspace/spark-master-compile-maven-hadoop-2.7/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:95: value asJava is not a member of scala.collection.immutable.Map[String,Long] [error] new java.util.HashMap(customMetrics.mapValues(long2Long).asJava) [error]^ [warn] one warning found [error] one error found [error] Compile failed at Aug 21, 2018 4:04:35 PM [12.827s] ``` ## How was this patch tested? It compiles! Closes #22175 from tdas/fix-build. Authored-by: Tathagata Das Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a998e9d8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a998e9d8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a998e9d8 Branch: refs/heads/master Commit: a998e9d829bd499dd7c65f973ea4389e0401b001 Parents: ad45299 Author: Tathagata Das Authored: Tue Aug 21 17:08:15 2018 -0700 Committer: Tathagata Das Committed: Tue Aug 21 17:08:15 2018 -0700 -- .../apache/spark/sql/execution/streaming/statefulOperators.scala | 2 ++ 1 file changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a998e9d8/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 7351db8..c11af34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID import java.util.concurrent.TimeUnit._ +import scala.collection.JavaConverters._ + import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25095][PYSPARK] Python support for BarrierTaskContext
Repository: spark Updated Branches: refs/heads/master 42035a4fe -> ad45299d0 [SPARK-25095][PYSPARK] Python support for BarrierTaskContext ## What changes were proposed in this pull request? Add method `barrier()` and `getTaskInfos()` in python TaskContext, these two methods are only allowed for barrier tasks. ## How was this patch tested? Add new tests in `tests.py` Closes #22085 from jiangxb1987/python.barrier. Authored-by: Xingbo Jiang Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad45299d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad45299d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad45299d Branch: refs/heads/master Commit: ad45299d047c10472fd3a86103930fe7c54a4cf1 Parents: 42035a4 Author: Xingbo Jiang Authored: Tue Aug 21 15:54:30 2018 -0700 Committer: Xiangrui Meng Committed: Tue Aug 21 15:54:30 2018 -0700 -- .../apache/spark/api/python/PythonRunner.scala | 106 ++ python/pyspark/serializers.py | 7 + python/pyspark/taskcontext.py | 144 +++ python/pyspark/tests.py | 36 - python/pyspark/worker.py| 16 ++- 5 files changed, 305 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ad45299d/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 7b31857..f824191 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -20,12 +20,14 @@ package org.apache.spark.api.python import java.io._ import java.net._ import java.nio.charset.StandardCharsets +import java.nio.charset.StandardCharsets.UTF_8 import java.util.concurrent.atomic.AtomicBoolean import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging +import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util._ @@ -76,6 +78,12 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( // TODO: support accumulator in multiple UDF protected val accumulator = funcs.head.funcs.head.accumulator + // Expose a ServerSocket to support method calls via socket from Python side. + private[spark] var serverSocket: Option[ServerSocket] = None + + // Authentication helper used when serving method calls via socket from Python side. + private lazy val authHelper = new SocketAuthHelper(SparkEnv.get.conf) + def compute( inputIterator: Iterator[IN], partitionIndex: Int, @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(partitionIndex) // Python version of driver PythonRDD.writeUTF(pythonVer, dataOut) +// Init a ServerSocket to accept method calls from Python side. +val isBarrier = context.isInstanceOf[BarrierTaskContext] +if (isBarrier) { + serverSocket = Some(new ServerSocket(/* port */ 0, +/* backlog */ 1, +InetAddress.getByName("localhost"))) + // A call to accept() for ServerSocket shall block infinitely. + serverSocket.map(_.setSoTimeout(0)) + new Thread("accept-connections") { +setDaemon(true) + +override def run(): Unit = { + while (!serverSocket.get.isClosed()) { +var sock: Socket = null +try { + sock = serverSocket.get.accept() + // Wait for function call from python side. + sock.setSoTimeout(1) + val input = new DataInputStream(sock.getInputStream()) + input.readInt() match { +case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION => + // The barrier() function may wait infinitely, socket shall not timeout + // before the function finishes. + sock.setSoTimeout(0) + barrierAndServe(sock) + +case _ => + val out = new DataOutputStream(new BufferedOutputStream( +sock.getOutputStream)) + writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out) + } +} catch { + case e: SocketException if e.getMessage.contains("Socket closed") => +// It is possible that the ServerSocket is not closed, but the native socket +
spark git commit: [SPARK-24441][SS] Expose total estimated size of states in HDFSBackedStateStoreProvider
Repository: spark Updated Branches: refs/heads/master ac0174e55 -> 42035a4fe [SPARK-24441][SS] Expose total estimated size of states in HDFSBackedStateStoreProvider ## What changes were proposed in this pull request? This patch exposes the estimation of size of cache (loadedMaps) in HDFSBackedStateStoreProvider as a custom metric of StateStore. The rationalize of the patch is that state backed by HDFSBackedStateStoreProvider will consume more memory than the number what we can get from query status due to caching multiple versions of states. The memory footprint to be much larger than query status reports in situations where the state store is getting a lot of updates: while shallow-copying map incurs additional small memory usages due to the size of map entities and references, but row objects will still be shared across the versions. If there're lots of updates between batches, less row objects will be shared and more row objects will exist in memory consuming much memory then what we expect. While HDFSBackedStateStore refers loadedMaps in HDFSBackedStateStoreProvider directly, there would be only one `StateStoreWriter` which refers a StateStoreProvider, so the value is not exposed as well as being aggregated multiple times. Current state metrics are safe to aggregate for the same reason. ## How was this patch tested? Tested manually. Below is the snapshot of UI page which is reflected by the patch: https://user-images.githubusercontent.com/1317309/40978481-b46ad324-690e-11e8-9b0f-e80528612a62.png;> Please refer "estimated size of states cache in provider total" as well as "count of versions in state cache in provider". Closes #21469 from HeartSaVioR/SPARK-24441. Authored-by: Jungtaek Lim Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42035a4f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42035a4f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42035a4f Branch: refs/heads/master Commit: 42035a4fec6eb216427486b5067a45fceb65cc2d Parents: ac0174e Author: Jungtaek Lim Authored: Tue Aug 21 15:28:31 2018 -0700 Committer: Tathagata Das Committed: Tue Aug 21 15:28:31 2018 -0700 -- .../state/HDFSBackedStateStoreProvider.scala| 39 +++- .../execution/streaming/state/StateStore.scala | 2 + .../state/SymmetricHashJoinStateManager.scala | 2 + .../execution/streaming/statefulOperators.scala | 12 ++- .../apache/spark/sql/streaming/progress.scala | 15 ++- .../streaming/state/StateStoreSuite.scala | 100 +++ .../streaming/StreamingQueryListenerSuite.scala | 2 +- .../StreamingQueryStatusAndProgressSuite.scala | 13 ++- 8 files changed, 176 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/42035a4f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 523acef..92a2480 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.util import java.util.Locale +import java.util.concurrent.atomic.LongAdder import scala.collection.JavaConverters._ import scala.collection.mutable @@ -165,7 +166,16 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } override def metrics: StateStoreMetrics = { - StateStoreMetrics(mapToUpdate.size(), SizeEstimator.estimate(mapToUpdate), Map.empty) + // NOTE: we provide estimation of cache size as "memoryUsedBytes", and size of state for + // current version as "stateOnCurrentVersionSizeBytes" + val metricsFromProvider: Map[String, Long] = getMetricsForProvider() + + val customMetrics = metricsFromProvider.flatMap { case (name, value) => +// just allow searching from list cause the list is small enough +supportedCustomMetrics.find(_.name == name).map(_ -> value) + } + (metricStateOnCurrentVersionSizeBytes -> SizeEstimator.estimate(mapToUpdate)) + + StateStoreMetrics(mapToUpdate.size(), metricsFromProvider("memoryUsedBytes"), customMetrics) } /** @@ -180,6 +190,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } } + def
spark git commit: [SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable
Repository: spark Updated Branches: refs/heads/master 6c5cb8585 -> ac0174e55 [SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable ## What changes were proposed in this pull request? In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider com.databricks.spark.avro is mapped to the new package org.apache.spark.sql.avro . As per the discussion in the [Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we should make the mapping configurable. This PR also improve the error message when data source of Avro/Kafka is not found. ## How was this patch tested? Unit test Closes #22133 from gengliangwang/configurable_avro_mapping. Authored-by: Gengliang Wang Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac0174e5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac0174e5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac0174e5 Branch: refs/heads/master Commit: ac0174e55af2e935d41545721e9f430c942b3a0c Parents: 6c5cb85 Author: Gengliang Wang Authored: Tue Aug 21 15:26:24 2018 -0700 Committer: Xiao Li Committed: Tue Aug 21 15:26:24 2018 -0700 -- .../org/apache/spark/sql/avro/AvroSuite.scala | 11 ++- .../org/apache/spark/sql/internal/SQLConf.scala | 10 ++ .../sql/execution/datasources/DataSource.scala| 16 ++-- .../sql/sources/ResolvedDataSourceSuite.scala | 18 ++ 4 files changed, 52 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala -- diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala index c4f4d8e..72bef9e 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala @@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils { } test("resolve avro data source") { -Seq("avro", "com.databricks.spark.avro").foreach { provider => +val databricksAvro = "com.databricks.spark.avro" +// By default the backward compatibility for com.databricks.spark.avro is enabled. +Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider => assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) === classOf[org.apache.spark.sql.avro.AvroFileFormat]) } + +withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> "false") { + val message = intercept[AnalysisException] { +DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf) + }.getMessage + assert(message.contains(s"Failed to find data source: $databricksAvro")) +} } test("reading from multiple paths") { http://git-wip-us.apache.org/repos/asf/spark/blob/ac0174e5/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index b44bfe7..5913c94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1469,6 +1469,13 @@ object SQLConf { .checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION) .createWithDefault(Deflater.DEFAULT_COMPRESSION) + val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED = +buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled") + .doc("If it is set to true, the data source provider com.databricks.spark.avro is mapped " + +"to the built-in but external Avro data source module for backward compatibility.") + .booleanConf + .createWithDefault(true) + val LEGACY_SETOPS_PRECEDENCE_ENABLED = buildConf("spark.sql.legacy.setopsPrecedence.enabled") .internal() @@ -1881,6 +1888,9 @@ class SQLConf extends Serializable with Logging { def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL) + def replaceDatabricksSparkAvroEnabled: Boolean = +getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED) + def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED) def parallelFileListingInStatsComputation: Boolean =
spark git commit: [SPARK-24763][SS] Remove redundant key data from value in streaming aggregation
Repository: spark Updated Branches: refs/heads/master 72ecfd095 -> 6c5cb8585 [SPARK-24763][SS] Remove redundant key data from value in streaming aggregation ## What changes were proposed in this pull request? This patch proposes a new flag option for stateful aggregation: remove redundant key data from value. Enabling new option runs similar with current, and uses less memory for state according to key/value fields of state operator. Please refer below link to see detailed perf. test result: https://issues.apache.org/jira/browse/SPARK-24763?focusedCommentId=16536539=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16536539 Since the state between enabling the option and disabling the option is not compatible, the option is set to 'disable' by default (to ensure backward compatibility), and OffsetSeqMetadata would prevent modifying the option after executing query. ## How was this patch tested? Modify unit tests to cover both disabling option and enabling option. Also did manual tests to see whether propose patch improves state memory usage. Closes #21733 from HeartSaVioR/SPARK-24763. Authored-by: Jungtaek Lim Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6c5cb858 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6c5cb858 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6c5cb858 Branch: refs/heads/master Commit: 6c5cb85856235efd464b109558896f81ae2c4c75 Parents: 72ecfd0 Author: Jungtaek Lim Authored: Tue Aug 21 15:22:42 2018 -0700 Committer: Tathagata Das Committed: Tue Aug 21 15:22:42 2018 -0700 -- .../org/apache/spark/sql/internal/SQLConf.scala | 10 + .../spark/sql/execution/SparkStrategies.scala | 3 + .../sql/execution/aggregate/AggUtils.scala | 5 +- .../streaming/IncrementalExecution.scala| 6 +- .../sql/execution/streaming/OffsetSeq.scala | 8 +- .../StreamingAggregationStateManager.scala | 205 +++ .../execution/streaming/statefulOperators.scala | 61 +++--- .../commits/0 | 2 + .../commits/1 | 2 + .../metadata| 1 + .../offsets/0 | 3 + .../offsets/1 | 3 + .../state/0/0/1.delta | Bin 0 -> 46 bytes .../state/0/0/2.delta | Bin 0 -> 46 bytes .../state/0/1/1.delta | Bin 0 -> 77 bytes .../state/0/1/2.delta | Bin 0 -> 77 bytes .../state/0/2/1.delta | Bin 0 -> 46 bytes .../state/0/2/2.delta | Bin 0 -> 46 bytes .../state/0/3/1.delta | Bin 0 -> 46 bytes .../state/0/3/2.delta | Bin 0 -> 46 bytes .../state/0/4/1.delta | Bin 0 -> 46 bytes .../state/0/4/2.delta | Bin 0 -> 77 bytes .../streaming/state/MemoryStateStore.scala | 49 + .../StreamingAggregationStateManagerSuite.scala | 126 .../streaming/FlatMapGroupsWithStateSuite.scala | 24 +-- .../streaming/StreamingAggregationSuite.scala | 150 +++--- 26 files changed, 573 insertions(+), 85 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6c5cb858/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bffdddc..b44bfe7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -888,6 +888,16 @@ object SQLConf { .intConf .createWithDefault(2) + val STREAMING_AGGREGATION_STATE_FORMAT_VERSION = +buildConf("spark.sql.streaming.aggregation.stateFormatVersion") + .internal() + .doc("State format version used by streaming aggregation operations in a streaming query. " + +"State between versions are tend to be incompatible, so state format version shouldn't " + +"be modified after running.") + .intConf + .checkValue(v => Set(1, 2).contains(v), "Valid versions are 1 and 2") + .createWithDefault(2) + val UNSUPPORTED_OPERATION_CHECK_ENABLED = buildConf("spark.sql.streaming.unsupportedOperationCheck") .internal() http://git-wip-us.apache.org/repos/asf/spark/blob/6c5cb858/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
spark git commit: [SPARK-25149][GRAPHX] Update Parallel Personalized Page Rank to test with large vertexIds
Repository: spark Updated Branches: refs/heads/master 99d2e4e00 -> 72ecfd095 [SPARK-25149][GRAPHX] Update Parallel Personalized Page Rank to test with large vertexIds ## What changes were proposed in this pull request? runParallelPersonalizedPageRank in graphx checks that `sources` are <= Int.MaxValue.toLong, but this is not actually required. This check seems to have been added because we use sparse vectors in the implementation and sparse vectors cannot be indexed by values > MAX_INT. However we do not ever index the sparse vector by the source vertexIds so this isn't an issue. I've added a test with large vertexIds to confirm this works as expected. ## How was this patch tested? Unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22139 from MrBago/remove-veretexId-check-pppr. Authored-by: Bago Amirbekian Signed-off-by: Joseph K. Bradley Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72ecfd09 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72ecfd09 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72ecfd09 Branch: refs/heads/master Commit: 72ecfd095062ad61c073f9b97bf3c47644575d60 Parents: 99d2e4e Author: Bago Amirbekian Authored: Tue Aug 21 15:21:55 2018 -0700 Committer: Joseph K. Bradley Committed: Tue Aug 21 15:21:55 2018 -0700 -- .../org/apache/spark/graphx/lib/PageRank.scala | 28 ++--- .../apache/spark/graphx/lib/PageRankSuite.scala | 32 +++- 2 files changed, 35 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala -- diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala index ebd65e8..96b635f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/PageRank.scala @@ -184,9 +184,11 @@ object PageRank extends Logging { * indexed by the position of nodes in the sources list) and * edge attributes the normalized edge weight */ - def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], -numIter: Int, resetProb: Double = 0.15, -sources: Array[VertexId]): Graph[Vector, Double] = { + def runParallelPersonalizedPageRank[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], + numIter: Int, + resetProb: Double = 0.15, + sources: Array[VertexId]): Graph[Vector, Double] = { require(numIter > 0, s"Number of iterations must be greater than 0," + s" but got ${numIter}") require(resetProb >= 0 && resetProb <= 1, s"Random reset probability must belong" + @@ -194,15 +196,11 @@ object PageRank extends Logging { require(sources.nonEmpty, s"The list of sources must be non-empty," + s" but got ${sources.mkString("[", ",", "]")}") -// TODO if one sources vertex id is outside of the int range -// we won't be able to store its activations in a sparse vector -require(sources.max <= Int.MaxValue.toLong, - s"This implementation currently only works for source vertex ids at most ${Int.MaxValue}") val zero = Vectors.sparse(sources.size, List()).asBreeze -val sourcesInitMap = sources.zipWithIndex.map { case (vid, i) => - val v = Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze - (vid, v) -}.toMap +// map of vid -> vector where for each vid, the _position of vid in source_ is set to 1.0 +val sourcesInitMap = sources.zipWithIndex.toMap.mapValues { i => + Vectors.sparse(sources.size, Array(i), Array(1.0)).asBreeze +} val sc = graph.vertices.sparkContext val sourcesInitMapBC = sc.broadcast(sourcesInitMap) // Initialize the PageRank graph with each edge attribute having @@ -212,13 +210,7 @@ object PageRank extends Logging { .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } // Set the weight on the edges based on the degree .mapTriplets(e => 1.0 / e.srcAttr, TripletFields.Src) - .mapVertices { (vid, attr) => -if (sourcesInitMapBC.value contains vid) { - sourcesInitMapBC.value(vid) -} else { - zero -} - } + .mapVertices((vid, _) => sourcesInitMapBC.value.getOrElse(vid, zero)) var i = 0 while (i < numIter) { http://git-wip-us.apache.org/repos/asf/spark/blob/72ecfd09/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala -- diff --git
svn commit: r28887 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_12_02-99d2e4e-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 21 19:16:17 2018 New Revision: 28887 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_21_12_02-99d2e4e docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-24296][CORE] Replicate large blocks as a stream.
Repository: spark Updated Branches: refs/heads/master 35f7f5ce8 -> 99d2e4e00 [SPARK-24296][CORE] Replicate large blocks as a stream. When replicating large cached RDD blocks, it can be helpful to replicate them as a stream, to avoid using large amounts of memory during the transfer. This also allows blocks larger than 2GB to be replicated. Added unit tests in DistributedSuite. Also ran tests on a cluster for blocks > 2gb. Closes #21451 from squito/clean_replication. Authored-by: Imran Rashid Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99d2e4e0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99d2e4e0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99d2e4e0 Branch: refs/heads/master Commit: 99d2e4e00711cffbfaee8cb3da9b6b3feab8ff18 Parents: 35f7f5c Author: Imran Rashid Authored: Tue Aug 21 11:26:41 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 21 11:26:41 2018 -0700 -- .../network/server/TransportRequestHandler.java | 2 +- .../shuffle/protocol/BlockTransferMessage.java | 3 +- .../shuffle/protocol/UploadBlockStream.java | 89 .../org/apache/spark/executor/Executor.scala| 4 +- .../apache/spark/internal/config/package.scala | 7 ++ .../apache/spark/network/BlockDataManager.scala | 12 +++ .../network/netty/NettyBlockRpcServer.scala | 26 +- .../netty/NettyBlockTransferService.scala | 39 + .../org/apache/spark/storage/BlockManager.scala | 66 ++- .../storage/BlockManagerManagedBuffer.scala | 7 +- .../org/apache/spark/storage/DiskStore.scala| 5 +- .../spark/util/io/ChunkedByteBuffer.scala | 4 + .../org/apache/spark/DistributedSuite.scala | 25 +- .../spark/security/EncryptionFunSuite.scala | 12 ++- .../apache/spark/storage/DiskStoreSuite.scala | 3 +- project/MimaExcludes.scala | 3 +- 16 files changed, 270 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java index c6fd56b..9fac96d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java @@ -234,7 +234,7 @@ public class TransportRequestHandler extends MessageHandler { callback.onSuccess(ByteBuffer.allocate(0)); } catch (Exception ex) { IOException ioExc = new IOException("Failure post-processing complete stream;" + - " failing this rpc and leaving channel active"); + " failing this rpc and leaving channel active", ex); callback.onFailure(ioExc); streamHandler.onFailure(streamId, ioExc); } http://git-wip-us.apache.org/repos/asf/spark/blob/99d2e4e0/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java -- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index 9af6759..a68a297 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -42,7 +42,7 @@ public abstract class BlockTransferMessage implements Encodable { /** Preceding every serialized message is its type, which allows us to deserialize it. */ public enum Type { OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4), -HEARTBEAT(5); +HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6); private final byte id; @@ -67,6 +67,7 @@ public abstract class BlockTransferMessage implements Encodable { case 3: return StreamHandle.decode(buf); case 4: return RegisterDriver.decode(buf); case 5: return ShuffleServiceHeartbeat.decode(buf); +case 6: return UploadBlockStream.decode(buf); default: throw new IllegalArgumentException("Unknown message type: " + type); } }
svn commit: r28885 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_21_10_02-9cb9d72-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 21 17:15:41 2018 New Revision: 28885 Log: Apache Spark 2.3.3-SNAPSHOT-2018_08_21_10_02-9cb9d72 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [DOCS][MINOR] Fix a few broken links and typos, and, nit, use HTTPS more consistently
Repository: spark Updated Branches: refs/heads/master d80063278 -> 35f7f5ce8 [DOCS][MINOR] Fix a few broken links and typos, and, nit, use HTTPS more consistently ## What changes were proposed in this pull request? Fix a few broken links and typos, and, nit, use HTTPS more consistently esp. on scripts and Apache links ## How was this patch tested? Doc build Closes #22172 from srowen/DocTypo. Authored-by: Sean Owen Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35f7f5ce Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35f7f5ce Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35f7f5ce Branch: refs/heads/master Commit: 35f7f5ce83984d8afe0b7955942baa04f2bef74f Parents: d800632 Author: Sean Owen Authored: Wed Aug 22 01:02:17 2018 +0800 Committer: hyukjinkwon Committed: Wed Aug 22 01:02:17 2018 +0800 -- docs/README.md | 4 ++-- docs/_layouts/404.html | 2 +- docs/_layouts/global.html | 6 +++--- docs/building-spark.md | 8 docs/contributing-to-spark.md | 2 +- docs/index.md | 16 docs/ml-migration-guides.md| 2 +- docs/quick-start.md| 2 +- docs/rdd-programming-guide.md | 4 ++-- docs/running-on-mesos.md | 2 +- docs/running-on-yarn.md| 2 +- docs/security.md | 6 +++--- docs/sparkr.md | 2 +- docs/sql-programming-guide.md | 6 +++--- docs/streaming-kinesis-integration.md | 2 +- docs/streaming-programming-guide.md| 5 ++--- docs/structured-streaming-programming-guide.md | 2 +- 17 files changed, 36 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/35f7f5ce/docs/README.md -- diff --git a/docs/README.md b/docs/README.md index dbea4d6..7da543d 100644 --- a/docs/README.md +++ b/docs/README.md @@ -2,7 +2,7 @@ Welcome to the Spark documentation! This readme will walk you through navigating and building the Spark documentation, which is included here with the Spark source code. You can also find documentation specific to release versions of -Spark at http://spark.apache.org/documentation.html. +Spark at https://spark.apache.org/documentation.html. Read on to learn more about viewing documentation in plain text (i.e., markdown) or building the documentation yourself. Why build it yourself? So that you have the docs that correspond to @@ -79,7 +79,7 @@ jekyll plugin to run `build/sbt unidoc` before building the site so if you haven may take some time as it generates all of the scaladoc and javadoc using [Unidoc](https://github.com/sbt/sbt-unidoc). The jekyll plugin also generates the PySpark docs using [Sphinx](http://sphinx-doc.org/), SparkR docs using [roxygen2](https://cran.r-project.org/web/packages/roxygen2/index.html) and SQL docs -using [MkDocs](http://www.mkdocs.org/). +using [MkDocs](https://www.mkdocs.org/). NOTE: To skip the step of building and copying over the Scala, Java, Python, R and SQL API docs, run `SKIP_API=1 jekyll build`. In addition, `SKIP_SCALADOC=1`, `SKIP_PYTHONDOC=1`, `SKIP_RDOC=1` and `SKIP_SQLDOC=1` can be used http://git-wip-us.apache.org/repos/asf/spark/blob/35f7f5ce/docs/_layouts/404.html -- diff --git a/docs/_layouts/404.html b/docs/_layouts/404.html index 0446544..78f98b9 100755 --- a/docs/_layouts/404.html +++ b/docs/_layouts/404.html @@ -151,7 +151,7 @@ var GOOG_FIXURL_LANG = (navigator.language || '').slice(0,2),GOOG_FIXURL_SITE = location.host; -http://linkhelp.clients.google.com/tbproxy/lh/wm/fixurl.js"> +https://linkhelp.clients.google.com/tbproxy/lh/wm/fixurl.js"> http://git-wip-us.apache.org/repos/asf/spark/blob/35f7f5ce/docs/_layouts/global.html -- diff --git a/docs/_layouts/global.html b/docs/_layouts/global.html index e5af5ae..88d549c 100755 --- a/docs/_layouts/global.html +++ b/docs/_layouts/global.html @@ -50,7 +50,7 @@ @@ -114,8 +114,8 @@ Hardware Provisioning Building Spark -http://spark.apache.org/contributing.html;>Contributing to Spark -
spark git commit: [MINOR] Add .crc files to .gitignore
Repository: spark Updated Branches: refs/heads/master 5059255d9 -> d80063278 [MINOR] Add .crc files to .gitignore ## What changes were proposed in this pull request? Add .crc files to .gitignore so that we don't add .crc files in state checkpoint to git repo which could be added in test resources. This is based on comments in #21733, https://github.com/apache/spark/pull/21733#issuecomment-414578244. ## How was this patch tested? Add `.1.delta.crc` and `.2.delta.crc` in `/sql/core/src/test/resources`, and confirm git doesn't suggest the files to add to stage. Closes #22170 from HeartSaVioR/add-crc-files-to-gitignore. Authored-by: Jungtaek Lim Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8006327 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8006327 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8006327 Branch: refs/heads/master Commit: d80063278debc5529653d184841f50fe98cdad97 Parents: 5059255 Author: Jungtaek Lim Authored: Wed Aug 22 01:00:06 2018 +0800 Committer: hyukjinkwon Committed: Wed Aug 22 01:00:06 2018 +0800 -- .gitignore | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8006327/.gitignore -- diff --git a/.gitignore b/.gitignore index e4c44d0..19db7ac 100644 --- a/.gitignore +++ b/.gitignore @@ -77,6 +77,7 @@ target/ unit-tests.log work/ docs/.jekyll-metadata +*.crc # For Hive TempStatsStore/ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25114][2.3][CORE][FOLLOWUP] Fix RecordBinaryComparatorSuite build failure
Repository: spark Updated Branches: refs/heads/branch-2.3 8bde46781 -> 9cb9d7201 [SPARK-25114][2.3][CORE][FOLLOWUP] Fix RecordBinaryComparatorSuite build failure ## What changes were proposed in this pull request? Fix RecordBinaryComparatorSuite build failure ## How was this patch tested? Existing tests. Closes #22166 from jiangxb1987/SPARK-25114-2.3. Authored-by: Xingbo Jiang Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9cb9d720 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9cb9d720 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9cb9d720 Branch: refs/heads/branch-2.3 Commit: 9cb9d7201acbef6dfc1c9fe0cfd39fd9d89cac76 Parents: 8bde467 Author: Xingbo Jiang Authored: Tue Aug 21 09:45:19 2018 -0700 Committer: Xiao Li Committed: Tue Aug 21 09:45:19 2018 -0700 -- .../test/java/org/apache/spark/memory/TestMemoryConsumer.java | 7 +++ 1 file changed, 7 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9cb9d720/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java -- diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java index db91329..1b7739c 100644 --- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java +++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java @@ -19,6 +19,8 @@ package org.apache.spark.memory; import java.io.IOException; +import org.apache.spark.unsafe.memory.MemoryBlock; + public class TestMemoryConsumer extends MemoryConsumer { public TestMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) { super(memoryManager, 1024L, mode); @@ -43,6 +45,11 @@ public class TestMemoryConsumer extends MemoryConsumer { used -= size; taskMemoryManager.releaseExecutionMemory(size, this); } + + public void freePage(MemoryBlock page) { +used -= page.size(); +taskMemoryManager.freePage(page, this); + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25161][CORE] Fix several bugs in failure handling of barrier execution mode
Repository: spark Updated Branches: refs/heads/master b8788b3e7 -> 5059255d9 [SPARK-25161][CORE] Fix several bugs in failure handling of barrier execution mode ## What changes were proposed in this pull request? Fix several bugs in failure handling of barrier execution mode: * Mark TaskSet for a barrier stage as zombie when a task attempt fails; * Multiple barrier task failures from a single barrier stage should not trigger multiple stage retries; * Barrier task failure from a previous failed stage attempt should not trigger stage retry; * Fail the job when a task from a barrier ResultStage failed; * RDD.isBarrier() should not rely on `ShuffleDependency`s. ## How was this patch tested? Added corresponding test cases in `DAGSchedulerSuite` and `TaskSchedulerImplSuite`. Closes #22158 from jiangxb1987/failure. Authored-by: Xingbo Jiang Signed-off-by: Xiangrui Meng Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5059255d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5059255d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5059255d Branch: refs/heads/master Commit: 5059255d91fc7a9810e013eba39e12d30291dd08 Parents: b8788b3 Author: Xingbo Jiang Authored: Tue Aug 21 08:25:02 2018 -0700 Committer: Xiangrui Meng Committed: Tue Aug 21 08:25:02 2018 -0700 -- .../main/scala/org/apache/spark/rdd/RDD.scala | 3 +- .../apache/spark/scheduler/DAGScheduler.scala | 125 +++ .../apache/spark/scheduler/TaskSetManager.scala | 4 + .../spark/scheduler/DAGSchedulerSuite.scala | 106 .../scheduler/TaskSchedulerImplSuite.scala | 18 +++ 5 files changed, 200 insertions(+), 56 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index cbc1143..374b846 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1863,7 +1863,8 @@ abstract class RDD[T: ClassTag]( // From performance concern, cache the value to avoid repeatedly compute `isBarrier()` on a long // RDD chain. - @transient protected lazy val isBarrier_ : Boolean = dependencies.exists(_.rdd.isBarrier()) + @transient protected lazy val isBarrier_ : Boolean = +dependencies.filter(!_.isInstanceOf[ShuffleDependency[_, _, _]]).exists(_.rdd.isBarrier()) } http://git-wip-us.apache.org/repos/asf/spark/blob/5059255d/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 2b0ca13..6787250 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1478,9 +1478,11 @@ private[spark] class DAGScheduler( mapOutputTracker.unregisterAllMapOutput(failedMapStage.shuffleDep.shuffleId) case failedResultStage: ResultStage => -// Mark all the partitions of the result stage to be not finished, to ensure retry -// all the tasks on resubmitted stage attempt. -failedResultStage.activeJob.map(_.resetAllPartitions()) +// Abort the failed result stage since we may have committed output for some +// partitions. +val reason = "Could not recover from a failed barrier ResultStage. Most recent " + + s"failure reason: $failureMessage" +abortStage(failedResultStage, reason, None) } } @@ -1553,62 +1555,75 @@ private[spark] class DAGScheduler( // Always fail the current stage and retry all the tasks when a barrier task fail. val failedStage = stageIdToStage(task.stageId) -logInfo(s"Marking $failedStage (${failedStage.name}) as failed due to a barrier task " + - "failed.") -val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" + - failure.toErrorString -try { - // killAllTaskAttempts will fail if a SchedulerBackend does not implement killTask. - val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) failed." - taskScheduler.killAllTaskAttempts(stageId, interruptThread = false, reason) -} catch { - case e: UnsupportedOperationException => -// Cannot continue with barrier stage if failed
svn commit: r28882 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_08_02-b8788b3-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 21 15:16:49 2018 New Revision: 28882 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_21_08_02-b8788b3 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [BUILD] Close stale PRs
Repository: spark Updated Branches: refs/heads/master 4fb96e510 -> b8788b3e7 [BUILD] Close stale PRs Closes #16411 Closes #21870 Closes #21794 Closes #21610 Closes #21961 Closes #21940 Closes #21870 Closes #22118 Closes #21624 Closes #19528 Closes #18424 Closes #22159 from srowen/Stale. Authored-by: Sean Owen Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8788b3e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8788b3e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8788b3e Branch: refs/heads/master Commit: b8788b3e79d0d508e3a910fefd7e9cff4c6d6245 Parents: 4fb96e5 Author: Sean Owen Authored: Tue Aug 21 08:18:21 2018 -0500 Committer: Sean Owen Committed: Tue Aug 21 08:18:21 2018 -0500 -- -- - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28878 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_21_02_02-8bde467-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 21 09:15:43 2018 New Revision: 28878 Log: Apache Spark 2.3.3-SNAPSHOT-2018_08_21_02_02-8bde467 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r28876 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_21_00_02-4fb96e5-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 21 07:17:04 2018 New Revision: 28876 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_21_00_02-4fb96e5 docs [This commit notification would consist of 1476 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE.
Repository: spark Updated Branches: refs/heads/branch-2.3 9702bb637 -> 8bde46781 [SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE. https://github.com/apache/spark/pull/22079#discussion_r209705612 It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue. This PR fixes the issue. Add new test cases in `RecordBinaryComparatorSuite`. Closes #22101 from jiangxb1987/fix-rbc. Authored-by: Xingbo Jiang Signed-off-by: Xiao Li (cherry picked from commit 4fb96e5105cec4a3eb19a2b7997600b086bac32f) Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8bde4678 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8bde4678 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8bde4678 Branch: refs/heads/branch-2.3 Commit: 8bde4678166f5f01837919d4f8d742b89f5e76b8 Parents: 9702bb6 Author: Xingbo Jiang Authored: Mon Aug 20 23:13:31 2018 -0700 Committer: Xiao Li Committed: Mon Aug 20 23:18:17 2018 -0700 -- .../sql/execution/RecordBinaryComparator.java | 26 +- .../sort/RecordBinaryComparatorSuite.java | 322 +++ 2 files changed, 337 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8bde4678/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java index bb77b5b..40c2cc8 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java @@ -22,12 +22,10 @@ import org.apache.spark.util.collection.unsafe.sort.RecordComparator; public final class RecordBinaryComparator extends RecordComparator { - // TODO(jiangxb) Add test suite for this. @Override public int compare( Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) { int i = 0; -int res = 0; // If the arrays have different length, the longer one is larger. if (leftLen != rightLen) { @@ -40,27 +38,33 @@ public final class RecordBinaryComparator extends RecordComparator { // check if stars align and we can get both offsets to be aligned if ((leftOff % 8) == (rightOff % 8)) { while ((leftOff + i) % 8 != 0 && i < leftLen) { -res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - -(Platform.getByte(rightObj, rightOff + i) & 0xff); -if (res != 0) return res; +final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff; +final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff; +if (v1 != v2) { + return v1 > v2 ? 1 : -1; +} i += 1; } } // for architectures that support unaligned accesses, chew it up 8 bytes at a time if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { while (i <= leftLen - 8) { -res = (int) ((Platform.getLong(leftObj, leftOff + i) - -Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE); -if (res != 0) return res; +final long v1 = Platform.getLong(leftObj, leftOff + i); +final long v2 = Platform.getLong(rightObj, rightOff + i); +if (v1 != v2) { + return v1 > v2 ? 1 : -1; +} i += 8; } } // this will finish off the unaligned comparisons, or do the entire aligned comparison // whichever is needed. while (i < leftLen) { - res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - - (Platform.getByte(rightObj, rightOff + i) & 0xff); - if (res != 0) return res; + final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff; + final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff; + if (v1 != v2) { +return v1 > v2 ? 1 : -1; + } i += 1; } http://git-wip-us.apache.org/repos/asf/spark/blob/8bde4678/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java new file mode 100644 index 000..97f3dc5 --- /dev/null +++
spark git commit: [SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE.
Repository: spark Updated Branches: refs/heads/master f984ec75e -> 4fb96e510 [SPARK-25114][CORE] Fix RecordBinaryComparator when subtraction between two words is divisible by Integer.MAX_VALUE. ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/22079#discussion_r209705612 It is possible for two objects to be unequal and yet we consider them as equal with this code, if the long values are separated by Int.MaxValue. This PR fixes the issue. ## How was this patch tested? Add new test cases in `RecordBinaryComparatorSuite`. Closes #22101 from jiangxb1987/fix-rbc. Authored-by: Xingbo Jiang Signed-off-by: Xiao Li Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fb96e51 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fb96e51 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fb96e51 Branch: refs/heads/master Commit: 4fb96e5105cec4a3eb19a2b7997600b086bac32f Parents: f984ec7 Author: Xingbo Jiang Authored: Mon Aug 20 23:13:31 2018 -0700 Committer: Xiao Li Committed: Mon Aug 20 23:13:31 2018 -0700 -- .../sql/execution/RecordBinaryComparator.java | 26 .../sort/RecordBinaryComparatorSuite.java | 66 2 files changed, 81 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4fb96e51/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java index bb77b5b..40c2cc8 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java @@ -22,12 +22,10 @@ import org.apache.spark.util.collection.unsafe.sort.RecordComparator; public final class RecordBinaryComparator extends RecordComparator { - // TODO(jiangxb) Add test suite for this. @Override public int compare( Object leftObj, long leftOff, int leftLen, Object rightObj, long rightOff, int rightLen) { int i = 0; -int res = 0; // If the arrays have different length, the longer one is larger. if (leftLen != rightLen) { @@ -40,27 +38,33 @@ public final class RecordBinaryComparator extends RecordComparator { // check if stars align and we can get both offsets to be aligned if ((leftOff % 8) == (rightOff % 8)) { while ((leftOff + i) % 8 != 0 && i < leftLen) { -res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - -(Platform.getByte(rightObj, rightOff + i) & 0xff); -if (res != 0) return res; +final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff; +final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff; +if (v1 != v2) { + return v1 > v2 ? 1 : -1; +} i += 1; } } // for architectures that support unaligned accesses, chew it up 8 bytes at a time if (Platform.unaligned() || (((leftOff + i) % 8 == 0) && ((rightOff + i) % 8 == 0))) { while (i <= leftLen - 8) { -res = (int) ((Platform.getLong(leftObj, leftOff + i) - -Platform.getLong(rightObj, rightOff + i)) % Integer.MAX_VALUE); -if (res != 0) return res; +final long v1 = Platform.getLong(leftObj, leftOff + i); +final long v2 = Platform.getLong(rightObj, rightOff + i); +if (v1 != v2) { + return v1 > v2 ? 1 : -1; +} i += 8; } } // this will finish off the unaligned comparisons, or do the entire aligned comparison // whichever is needed. while (i < leftLen) { - res = (Platform.getByte(leftObj, leftOff + i) & 0xff) - - (Platform.getByte(rightObj, rightOff + i) & 0xff); - if (res != 0) return res; + final int v1 = Platform.getByte(leftObj, leftOff + i) & 0xff; + final int v2 = Platform.getByte(rightObj, rightOff + i) & 0xff; + if (v1 != v2) { +return v1 > v2 ? 1 : -1; + } i += 1; } http://git-wip-us.apache.org/repos/asf/spark/blob/4fb96e51/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java -- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java index a19ddbd..97f3dc5 100644 ---