spark git commit: [SPARK-25235][SHELL] Merge the REPL code in Scala 2.11 and 2.12 branches
Repository: spark Updated Branches: refs/heads/master 38391c9aa -> ff8dcc1d4 [SPARK-25235][SHELL] Merge the REPL code in Scala 2.11 and 2.12 branches ## What changes were proposed in this pull request? Using some reflection tricks to merge Scala 2.11 and 2.12 codebase. ## How was this patch tested? Existing tests. Closes #22246 from dbtsai/repl. Lead-authored-by: DB Tsai Co-authored-by: Liang-Chi Hsieh Signed-off-by: DB Tsai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff8dcc1d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff8dcc1d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff8dcc1d Branch: refs/heads/master Commit: ff8dcc1d4c684e1b68e63d61b3f20284b9979cca Parents: 38391c9 Author: DB Tsai Authored: Wed Aug 29 04:30:31 2018 + Committer: DB Tsai Committed: Wed Aug 29 04:30:31 2018 + -- repl/pom.xml| 10 - .../org/apache/spark/repl/SparkILoop.scala | 278 .../org/apache/spark/repl/SparkILoop.scala | 143 - .../org/apache/spark/repl/SparkILoop.scala | 319 +++ 4 files changed, 319 insertions(+), 431 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ff8dcc1d/repl/pom.xml -- diff --git a/repl/pom.xml b/repl/pom.xml index 861bbd7..553d5eb 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -167,14 +167,4 @@ - - - scala-2.12 - -scala-2.12/src/main/scala -scala-2.12/src/test/scala - - - - http://git-wip-us.apache.org/repos/asf/spark/blob/ff8dcc1d/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala -- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala deleted file mode 100644 index 9426526..000 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.repl - -import java.io.BufferedReader - -// scalastyle:off println -import scala.Predef.{println => _, _} -// scalastyle:on println -import scala.concurrent.Future -import scala.reflect.classTag -import scala.reflect.internal.util.ScalaClassLoader.savingContextLoader -import scala.reflect.io.File -import scala.tools.nsc.{GenericRunnerSettings, Properties} -import scala.tools.nsc.Settings -import scala.tools.nsc.interpreter.{isReplDebug, isReplPower, replProps} -import scala.tools.nsc.interpreter.{AbstractOrMissingHandler, ILoop, IMain, JPrintWriter} -import scala.tools.nsc.interpreter.{NamedParam, SimpleReader, SplashLoop, SplashReader} -import scala.tools.nsc.interpreter.StdReplTags.tagOfIMain -import scala.tools.nsc.util.stringFromStream -import scala.util.Properties.{javaVersion, javaVmName, versionString} - -/** - * A Spark-specific interactive shell. - */ -class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) -extends ILoop(in0, out) { - def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) - def this() = this(None, new JPrintWriter(Console.out, true)) - - override def createInterpreter(): Unit = { -intp = new SparkILoopInterpreter(settings, out) - } - - val initializationCommands: Seq[String] = Seq( -""" -@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { -org.apache.spark.repl.Main.sparkSession - } else { -org.apache.spark.repl.Main.createSparkSession() - } -@transient val sc = { - val _sc = spark.sparkContext - if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { -val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) -if (proxyUrl != null) { - println( -s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}
svn commit: r29001 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_28_20_02-38391c9-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Aug 29 03:16:01 2018 New Revision: 29001 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_28_20_02-38391c9 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25253][PYSPARK] Refactor local connection & auth code
Repository: spark Updated Branches: refs/heads/master 68ec207a3 -> 38391c9aa [SPARK-25253][PYSPARK] Refactor local connection & auth code This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes #22247 from squito/py_connection_refactor. Authored-by: Imran Rashid Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38391c9a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38391c9a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38391c9a Branch: refs/heads/master Commit: 38391c9aa8a88fcebb337934f30298a32d91596b Parents: 68ec207 Author: Imran Rashid Authored: Wed Aug 29 09:47:38 2018 +0800 Committer: hyukjinkwon Committed: Wed Aug 29 09:47:38 2018 +0800 -- .../apache/spark/api/python/PythonRunner.scala | 3 +- python/pyspark/java_gateway.py | 32 +++- python/pyspark/rdd.py | 27 ++--- python/pyspark/taskcontext.py | 32 +++- python/pyspark/worker.py| 7 ++--- 5 files changed, 40 insertions(+), 61 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/38391c9a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index da6475c..6c7e863 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -216,6 +216,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( sock = serverSocket.get.accept() // Wait for function call from python side. sock.setSoTimeout(1) + authHelper.authClient(sock) val input = new DataInputStream(sock.getInputStream()) input.readInt() match { case BarrierTaskContextMessageProtocol.BARRIER_FUNCTION => @@ -334,8 +335,6 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( def barrierAndServe(sock: Socket): Unit = { require(serverSocket.isDefined, "No available ServerSocket to redirect the barrier() call.") - authHelper.authClient(sock) - val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream)) try { context.asInstanceOf[BarrierTaskContext].barrier() http://git-wip-us.apache.org/repos/asf/spark/blob/38391c9a/python/pyspark/java_gateway.py -- diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index fa2d5e8..b06503b 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -134,7 +134,7 @@ def launch_gateway(conf=None): return gateway -def do_server_auth(conn, auth_secret): +def _do_server_auth(conn, auth_secret): """ Performs the authentication protocol defined by the SocketAuthHelper class on the given file-like object 'conn'. @@ -147,6 +147,36 @@ def do_server_auth(conn, auth_secret): raise Exception("Unexpected reply from iterator server.") +def local_connect_and_auth(port, auth_secret): +""" +Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection. +Handles IPV4 & IPV6, does some error handling. +:param port +:param auth_secret +:return: a tuple with (sockfile, sock) +""" +sock = None +errors = [] +# Support for both IPv4 and IPv6. +# On most of IPv6-ready systems, IPv6 will take precedence. +for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): +af, socktype, proto, _, sa = res +try: +sock = socket.socket(af, socktype, proto) +sock.settimeout(15) +sock.connect(sa) +sockfile = sock.makefile("rwb", 65536) +_do_server_auth(sockfile, auth_secret) +return (sockfile, sock) +except socket.error as e: +emsg = _exception_message(e) +errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg)) +sock.close() +sock = None +else: +raise Exception("could not o
spark git commit: [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType
Repository: spark Updated Branches: refs/heads/master 32c8a3d7b -> 68ec207a3 [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType ## What changes were proposed in this pull request? `toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name. When invoked with a spark data type like, ```java val sparkSchema = StructType(Seq( StructField("name", StringType, nullable = false), StructField("address", StructType(Seq( StructField("city", StringType, nullable = false), StructField("state", StringType, nullable = false))), nullable = false))) // map it to an avro schema with record name "employee" and top level namespace "foo.bar", val avroSchema = SchemaConverters.toAvroType(sparkSchema, false, "employee", "foo.bar") // result is // avroSchema.getName = employee // avroSchema.getNamespace = foo.bar.employee // avroSchema.getFullname = foo.bar.employee.employee ``` The patch proposes to fix this so that the result is ``` avroSchema.getName = employee avroSchema.getNamespace = foo.bar avroSchema.getFullname = foo.bar.employee ``` ## How was this patch tested? New and existing unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #22251 from arunmahadevan/avro-fix. Authored-by: Arun Mahadevan Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ec207a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ec207a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ec207a Branch: refs/heads/master Commit: 68ec207a320bd50ca61e820c9ff559f799c2ab0a Parents: 32c8a3d Author: Arun Mahadevan Authored: Wed Aug 29 09:25:49 2018 +0800 Committer: hyukjinkwon Committed: Wed Aug 29 09:25:49 2018 +0800 -- .../spark/sql/avro/SchemaConverters.scala | 18 - .../org/apache/spark/sql/avro/AvroSuite.scala | 42 +++- 2 files changed, 48 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/68ec207a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala -- diff --git a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala index 3a15e8d..bd15765 100644 --- a/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala +++ b/external/avro/src/main/scala/org/apache/spark/sql/avro/SchemaConverters.scala @@ -123,7 +123,7 @@ object SchemaConverters { catalystType: DataType, nullable: Boolean = false, recordName: String = "topLevelRecord", - prevNameSpace: String = "") + nameSpace: String = "") : Schema = { val builder = SchemaBuilder.builder() @@ -143,29 +143,25 @@ object SchemaConverters { val avroType = LogicalTypes.decimal(d.precision, d.scale) val fixedSize = minBytesForPrecision(d.precision) // Need to avoid naming conflict for the fixed fields -val name = prevNameSpace match { +val name = nameSpace match { case "" => s"$recordName.fixed" - case _ => s"$prevNameSpace.$recordName.fixed" + case _ => s"$nameSpace.$recordName.fixed" } avroType.addToSchema(SchemaBuilder.fixed(name).size(fixedSize)) case BinaryType => builder.bytesType() case ArrayType(et, containsNull) => builder.array() - .items(toAvroType(et, containsNull, recordName, prevNameSpace)) + .items(toAvroType(et, containsNull, recordName, nameSpace)) case MapType(StringType, vt, valueContainsNull) => builder.map() - .values(toAvroType(vt, valueContainsNull, recordName, prevNameSpace)) + .values(toAvroType(vt, valueContainsNull, recordName, nameSpace)) case st: StructType => -val nameSpace = prevNameSpace match { - case "" => recordName - case _ => s"$prevNameSpace.$recordName" -} - +val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName" else recordName val fieldsAssembler = builder.record(recordName).namespace(nameSpace).fields() st.foreach { f => val fieldAvroType = -toAvroType(f.dataType, f.nullable, f.name, nameSpace) +toAvroType(f.dataType, f.nullable, f.name, childNameSpace) fieldsAssembler.name(f.name).`type`(fieldAvroType).noDefault() } fieldsAssembler.endRecord() http://git-wip-us.apache.org/repos/asf/spark/blob/68ec207a/external/avro/src/test/scala/org/apache/spark/sql/av
spark git commit: [MINOR] Avoid code duplication for nullable in Higher Order function
Repository: spark Updated Branches: refs/heads/master bbbf81469 -> 32c8a3d7b [MINOR] Avoid code duplication for nullable in Higher Order function ## What changes were proposed in this pull request? Most of `HigherOrderFunction`s have the same `nullable` definition, ie. they are nullable when one of their arguments is nullable. The PR refactors it in order to avoid code duplication. ## How was this patch tested? NA Closes #22243 from mgaido91/MINOR_nullable_hof. Authored-by: Marco Gaido Signed-off-by: hyukjinkwon Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32c8a3d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32c8a3d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32c8a3d7 Branch: refs/heads/master Commit: 32c8a3d7beac4b47a75f5ec3c69b13ebc57de0c7 Parents: bbbf814 Author: Marco Gaido Authored: Wed Aug 29 09:20:32 2018 +0800 Committer: hyukjinkwon Committed: Wed Aug 29 09:20:32 2018 +0800 -- .../expressions/higherOrderFunctions.scala| 18 ++ 1 file changed, 2 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/32c8a3d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index 9f2e84a..2bb6b20 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -90,6 +90,8 @@ object LambdaFunction { */ trait HigherOrderFunction extends Expression with ExpectsInputTypes { + override def nullable: Boolean = arguments.exists(_.nullable) + override def children: Seq[Expression] = arguments ++ functions /** @@ -217,8 +219,6 @@ case class ArrayTransform( function: Expression) extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback { - override def nullable: Boolean = argument.nullable - override def dataType: ArrayType = ArrayType(function.dataType, function.nullable) override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): ArrayTransform = { @@ -287,8 +287,6 @@ case class MapFilter( copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil)) } - override def nullable: Boolean = argument.nullable - override def nullSafeEval(inputRow: InternalRow, argumentValue: Any): Any = { val m = argumentValue.asInstanceOf[MapData] val f = functionForEval @@ -328,8 +326,6 @@ case class ArrayFilter( function: Expression) extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback { - override def nullable: Boolean = argument.nullable - override def dataType: DataType = argument.dataType override def functionType: AbstractDataType = BooleanType @@ -375,8 +371,6 @@ case class ArrayExists( function: Expression) extends ArrayBasedSimpleHigherOrderFunction with CodegenFallback { - override def nullable: Boolean = argument.nullable - override def dataType: DataType = BooleanType override def functionType: AbstractDataType = BooleanType @@ -516,8 +510,6 @@ case class TransformKeys( function: Expression) extends MapBasedSimpleHigherOrderFunction with CodegenFallback { - override def nullable: Boolean = argument.nullable - @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType override def dataType: DataType = MapType(function.dataType, valueType, valueContainsNull) @@ -568,8 +560,6 @@ case class TransformValues( function: Expression) extends MapBasedSimpleHigherOrderFunction with CodegenFallback { - override def nullable: Boolean = argument.nullable - @transient lazy val MapType(keyType, valueType, valueContainsNull) = argument.dataType override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) @@ -638,8 +628,6 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression) override def functionTypes: Seq[AbstractDataType] = AnyDataType :: Nil - override def nullable: Boolean = left.nullable || right.nullable - override def dataType: DataType = MapType(keyType, function.dataType, function.nullable) override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapZipWith = { @@ -810,8 +798,6 @@ case class ZipWith(left: Expression, right: Expression, function: Expression) override def functionTypes: Seq[AbstractDataType] = AnyDataType :: Nil
spark git commit: [SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter
Repository: spark Updated Branches: refs/heads/master 103854028 -> bbbf81469 [SPARK-22357][CORE] SparkContext.binaryFiles ignore minPartitions parameter ## What changes were proposed in this pull request? Fix the issue that minPartitions was not used in the method. This is a simple fix and I am not trying to make it complicated. The purpose is to still allow user to control the defaultParallelism through the value of minPartitions, while also via sc.defaultParallelism parameters. ## How was this patch tested? I have not provided the additional test since the fix is very straightforward. Closes #21638 from bomeng/22357. Lead-authored-by: Bo Meng Co-authored-by: Bo Meng Signed-off-by: Sean Owen Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bbbf8146 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bbbf8146 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bbbf8146 Branch: refs/heads/master Commit: bbbf8146916aa70d9774543643776eed9d9d9373 Parents: 1038540 Author: Bo Meng Authored: Tue Aug 28 19:39:13 2018 -0500 Committer: Sean Owen Committed: Tue Aug 28 19:39:13 2018 -0500 -- .../src/main/scala/org/apache/spark/input/PortableDataStream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bbbf8146/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala -- diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 17cdba4..ab020aa 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -47,7 +47,7 @@ private[spark] abstract class StreamFileInputFormat[T] def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) { val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES) val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES) -val defaultParallelism = sc.defaultParallelism +val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions) val files = listStatus(context).asScala val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum val bytesPerCore = totalBytes / defaultParallelism - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r29000 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_28_16_01-1038540-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 28 23:15:56 2018 New Revision: 29000 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_28_16_01-1038540 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25212][SQL] Support Filter in ConvertToLocalRelation
Repository: spark Updated Branches: refs/heads/master 7ad18ee9f -> 103854028 [SPARK-25212][SQL] Support Filter in ConvertToLocalRelation ## What changes were proposed in this pull request? Support Filter in ConvertToLocalRelation, similar to how Project works. Additionally, in Optimizer, run ConvertToLocalRelation earlier to simplify the plan. This is good for very short queries which often are queries on local relations. ## How was this patch tested? New test. Manual benchmark. Author: Bogdan Raducanu Author: Shixiong Zhu Author: Yinan Li Author: Li Jin Author: s71955 Author: DB Tsai Author: jaroslav chládek Author: Huangweizhe Author: Xiangrui Meng Author: hyukjinkwon Author: Kent Yao Author: caoxuewen Author: liuxian Author: Adam Bradbury Author: Jose Torres Author: Yuming Wang Author: Liang-Chi Hsieh Closes #22205 from bogdanrdc/local-relation-filter. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10385402 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10385402 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10385402 Branch: refs/heads/master Commit: 103854028e99846aabeb6f27eb6fd255ecc96381 Parents: 7ad18ee Author: Bogdan Raducanu Authored: Tue Aug 28 15:50:25 2018 -0700 Committer: Xiao Li Committed: Tue Aug 28 15:50:25 2018 -0700 -- .../spark/sql/catalyst/optimizer/Optimizer.scala | 14 ++ .../optimizer/ConvertToLocalRelationSuite.scala | 18 ++ .../org/apache/spark/sql/DataFrameJoinSuite.scala | 8 3 files changed, 36 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/10385402/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 63a62cd..e4b4f1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -130,6 +130,14 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // since the other rules might make two separate Unions operators adjacent. Batch("Union", Once, CombineUnions) :: +// run this once earlier. this might simplify the plan and reduce cost of optimizer. +// for example, a query such as Filter(LocalRelation) would go through all the heavy +// optimizer rules that are triggered when there is a filter +// (e.g. InferFiltersFromConstraints). if we run this batch earlier, the query becomes just +// LocalRelation and does not trigger many rules +Batch("LocalRelation early", fixedPoint, + ConvertToLocalRelation, + PropagateEmptyRelation) :: Batch("Pullup Correlated Expressions", Once, PullupCorrelatedPredicates) :: Batch("Subquery", Once, @@ -1349,6 +1357,12 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] { case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) => LocalRelation(output, data.take(limit), isStreaming) + +case Filter(condition, LocalRelation(output, data, isStreaming)) +if !hasUnevaluableExpr(condition) => + val predicate = InterpretedPredicate.create(condition, output) + predicate.initialize(0) + LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming) } private def hasUnevaluableExpr(expr: Expression): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/10385402/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala -- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala index 049a19b..0c015f8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.{LessThan, Literal} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.
svn commit: r28998 - in /dev/spark/2.3.3-SNAPSHOT-2018_08_28_14_01-306e881-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 28 21:15:27 2018 New Revision: 28998 Log: Apache Spark 2.3.3-SNAPSHOT-2018_08_28_14_01-306e881 docs [This commit notification would consist of 1443 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25004][CORE] Add spark.executor.pyspark.memory limit.
Repository: spark Updated Branches: refs/heads/master aff8f15c1 -> 7ad18ee9f [SPARK-25004][CORE] Add spark.executor.pyspark.memory limit. ## What changes were proposed in this pull request? This adds `spark.executor.pyspark.memory` to configure Python's address space limit, [`resource.RLIMIT_AS`](https://docs.python.org/3/library/resource.html#resource.RLIMIT_AS). Limiting Python's address space allows Python to participate in memory management. In practice, we see fewer cases of Python taking too much memory because it doesn't know to run garbage collection. This results in YARN killing fewer containers. This also improves error messages so users know that Python is consuming too much memory: ``` File "build/bdist.linux-x86_64/egg/package/library.py", line 265, in fe_engineer fe_eval_rec.update(f(src_rec_prep, mat_rec_prep)) File "build/bdist.linux-x86_64/egg/package/library.py", line 163, in fe_comp comparisons = EvaluationUtils.leven_list_compare(src_rec_prep.get(item, []), mat_rec_prep.get(item, [])) File "build/bdist.linux-x86_64/egg/package/evaluationutils.py", line 25, in leven_list_compare permutations = sorted(permutations, reverse=True) MemoryError ``` The new pyspark memory setting is used to increase requested YARN container memory, instead of sharing overhead memory between python and off-heap JVM activity. ## How was this patch tested? Tested memory limits in our YARN cluster and verified that MemoryError is thrown. Author: Ryan Blue Closes #21977 from rdblue/SPARK-25004-add-python-memory-limit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ad18ee9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ad18ee9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ad18ee9 Branch: refs/heads/master Commit: 7ad18ee9f26e75dbe038c6034700f9cd4c0e2baa Parents: aff8f15 Author: Ryan Blue Authored: Tue Aug 28 12:31:33 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 28 12:31:33 2018 -0700 -- .../org/apache/spark/api/python/PythonRDD.scala | 5 +--- .../apache/spark/api/python/PythonRunner.scala | 27 .../apache/spark/internal/config/package.scala | 4 +++ docs/configuration.md | 12 + python/pyspark/worker.py| 23 + .../org/apache/spark/deploy/yarn/Client.scala | 17 .../spark/deploy/yarn/YarnAllocator.scala | 9 ++- .../deploy/yarn/BaseYarnClusterSuite.scala | 27 ++-- .../spark/deploy/yarn/YarnClusterSuite.scala| 6 +++-- .../python/AggregateInPandasExec.scala | 4 --- .../execution/python/ArrowEvalPythonExec.scala | 4 --- .../execution/python/ArrowPythonRunner.scala| 4 +-- .../execution/python/BatchEvalPythonExec.scala | 5 +--- .../sql/execution/python/EvalPythonExec.scala | 6 + .../python/FlatMapGroupsInPandasExec.scala | 4 --- .../execution/python/PythonForeachWriter.scala | 5 +--- .../sql/execution/python/PythonUDFRunner.scala | 4 +-- .../execution/python/WindowInPandasExec.scala | 4 --- 18 files changed, 105 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7ad18ee9/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index c3db60a..197f464 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -49,9 +49,6 @@ private[spark] class PythonRDD( isFromBarrier: Boolean = false) extends RDD[Array[Byte]](parent) { - val bufferSize = conf.getInt("spark.buffer.size", 65536) - val reuseWorker = conf.getBoolean("spark.python.worker.reuse", true) - override def getPartitions: Array[Partition] = firstParent.partitions override val partitioner: Option[Partitioner] = { @@ -61,7 +58,7 @@ private[spark] class PythonRDD( val asJavaRDD: JavaRDD[Array[Byte]] = JavaRDD.fromRDD(this) override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { -val runner = PythonRunner(func, bufferSize, reuseWorker) +val runner = PythonRunner(func) runner.compute(firstParent.iterator(split, context), split.index, context) } http://git-wip-us.apache.org/repos/asf/spark/blob/7ad18ee9/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/
svn commit: r28997 - in /dev/spark/2.4.0-SNAPSHOT-2018_08_28_12_01-aff8f15-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Aug 28 19:16:55 2018 New Revision: 28997 Log: Apache Spark 2.4.0-SNAPSHOT-2018_08_28_12_01-aff8f15 docs [This commit notification would consist of 1478 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS
Repository: spark Updated Branches: refs/heads/master 4e3f3cebe -> aff8f15c1 [SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS ## What changes were proposed in this pull request? In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only. ## How was this patch tested? I extended an existing test to trigger the deadlock. Author: Maxim Gekk Closes #22233 from MaxGekk/fix-recover-partitions. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aff8f15c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aff8f15c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aff8f15c Branch: refs/heads/master Commit: aff8f15c153f8031ceaffa237c60e040c6f8115f Parents: 4e3f3ce Author: Maxim Gekk Authored: Tue Aug 28 11:29:05 2018 -0700 Committer: Xiao Li Committed: Tue Aug 28 11:29:05 2018 -0700 -- .../spark/sql/execution/command/ddl.scala | 34 +-- .../spark/sql/execution/command/DDLSuite.scala | 59 .../spark/sql/hive/execution/HiveDDLSuite.scala | 15 ++--- 3 files changed, 61 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aff8f15c/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7a6f574..e1faece 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command import java.util.Locale import scala.collection.{GenMap, GenSeq} -import scala.concurrent.ExecutionContext +import scala.collection.parallel.ForkJoinTaskSupport import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration @@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.Resolver +import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.types._ import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} -import org.apache.spark.util.ThreadUtils.parmap // Note: The definition of these commands are based on the ones described in // https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL @@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand( val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = try { -implicit val ec = ExecutionContext.fromExecutor(evalPool) scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, - spark.sessionState.conf.resolver) + spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq } finally { evalPool.shutdown() } @@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand( spec: TablePartitionSpec, partitionNames: Seq[String], threshold: Int, - resolver: Resolver)(implicit ec: ExecutionContext): Seq[(TablePartitionSpec, Path)] = { + resolver: Resolver, + evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = { if (partitionNames.isEmpty) { return Seq(spec -> path) } -val statuses = fs.listStatus(path, filter).toSeq -def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = { +val statuses = fs.listStatus(path, filter) +val statusPar: GenSeq[FileStatus] = + if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) { +// parallelize the list of partitions here, then we can have better parallelism later. +val parArray = statuses.par +parArray.tasksupport = evalTaskSupport +parArray + } else { +statuses + } +statusPar.flatMap { st => val name = st.getPath.getName
spark git commit: [SPARK-24704][WEBUI] Fix the order of stages in the DAG graph
Repository: spark Updated Branches: refs/heads/branch-2.3 8db935f97 -> 306e881b6 [SPARK-24704][WEBUI] Fix the order of stages in the DAG graph ## What changes were proposed in this pull request? Before: ![wx20180630-155537](https://user-images.githubusercontent.com/1438757/42123357-2c2e2d84-7c83-11e8-8abd-1c2860f38783.png) After: ![wx20180630-155604](https://user-images.githubusercontent.com/1438757/42123359-32fae990-7c83-11e8-8a7b-cdcee94f9123.png) ## How was this patch tested? Manual tests. Author: Stan Zhai Closes #21680 from stanzhai/fix-dag-graph. (cherry picked from commit 772060d0940a97d89807befd682a70ae82e83ef4) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/306e881b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/306e881b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/306e881b Branch: refs/heads/branch-2.3 Commit: 306e881b62eb112f8014219098eb97f7cbe75e98 Parents: 8db935f Author: Stan Zhai Authored: Wed Jul 4 10:12:36 2018 +0200 Committer: Marcelo Vanzin Committed: Tue Aug 28 10:38:03 2018 -0700 -- core/src/main/scala/org/apache/spark/status/AppStatusStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/306e881b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 688f25a..e237281 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -471,7 +471,7 @@ private[spark] class AppStatusStore( def operationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = { val job = store.read(classOf[JobDataWrapper], jobId) -val stages = job.info.stageIds +val stages = job.info.stageIds.sorted stages.map { id => val g = store.read(classOf[RDDOperationGraphWrapper], id).toRDDOperationGraph() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23679][YARN] Setting RM_HA_URLS for AmIpFilter to avoid redirect failure in YARN mode
Repository: spark Updated Branches: refs/heads/master de46df549 -> 4e3f3cebe [SPARK-23679][YARN] Setting RM_HA_URLS for AmIpFilter to avoid redirect failure in YARN mode ## What changes were proposed in this pull request? YARN `AmIpFilter` adds a new parameter "RM_HA_URLS" to support RM HA, but Spark on YARN doesn't provide a such parameter, so it will be failed to redirect when running on RM HA. The detailed exception can be checked from JIRA. So here fixing this issue by adding "RM_HA_URLS" parameter. ## How was this patch tested? Local verification. Closes #22164 from jerryshao/SPARK-23679. Authored-by: jerryshao Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4e3f3ceb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4e3f3ceb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4e3f3ceb Branch: refs/heads/master Commit: 4e3f3cebe4cc6f47c264821a5ea92c32a4f1daa5 Parents: de46df5 Author: jerryshao Authored: Tue Aug 28 10:33:39 2018 -0700 Committer: Marcelo Vanzin Committed: Tue Aug 28 10:33:39 2018 -0700 -- .../apache/spark/deploy/yarn/YarnRMClient.scala | 29 +++- 1 file changed, 28 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4e3f3ceb/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala -- diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index b59dcf1..05a7b1e 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import scala.collection.JavaConverters._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest @@ -112,7 +113,16 @@ private[spark] class YarnRMClient extends Logging { val proxies = WebAppUtils.getProxyHostsAndPortsForAmFilter(conf) val hosts = proxies.asScala.map(_.split(":").head) val uriBases = proxies.asScala.map { proxy => prefix + proxy + proxyBase } -Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) +val params = + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) + +// Handles RM HA urls +val rmIds = conf.getStringCollection(YarnConfiguration.RM_HA_IDS).asScala +if (rmIds != null && rmIds.nonEmpty) { + params + ("RM_HA_URLS" -> rmIds.map(getUrlByRmId(conf, _)).mkString(",")) +} else { + params +} } /** Returns the maximum number of attempts to register the AM. */ @@ -126,4 +136,21 @@ private[spark] class YarnRMClient extends Logging { } } + private def getUrlByRmId(conf: Configuration, rmId: String): String = { +val addressPropertyPrefix = if (YarnConfiguration.useHttps(conf)) { + YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS +} else { + YarnConfiguration.RM_WEBAPP_ADDRESS +} + +val addressWithRmId = if (rmId == null || rmId.isEmpty) { + addressPropertyPrefix +} else if (rmId.startsWith(".")) { + throw new IllegalStateException(s"rmId $rmId should not already have '.' prepended.") +} else { + s"$addressPropertyPrefix.$rmId" +} + +conf.get(addressWithRmId) + } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-23997][SQL] Configurable maximum number of buckets
Repository: spark Updated Branches: refs/heads/master 1149c4efb -> de46df549 [SPARK-23997][SQL] Configurable maximum number of buckets ## What changes were proposed in this pull request? This PR implements the possibility of the user to override the maximum number of buckets when saving to a table. Currently the limit is a hard-coded 100k, which might be insufficient for large workloads. A new configuration entry is proposed: `spark.sql.bucketing.maxBuckets`, which defaults to the previous 100k. ## How was this patch tested? Added unit tests in the following spark.sql test suites: - CreateTableAsSelectSuite - BucketedWriteSuite Author: Fernando Pereira Closes #21087 from ferdonline/enh/configurable_bucket_limit. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de46df54 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de46df54 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de46df54 Branch: refs/heads/master Commit: de46df549acee7fda56bb0871f444d2f3b49e582 Parents: 1149c4e Author: Fernando Pereira Authored: Tue Aug 28 10:31:47 2018 -0700 Committer: Xiao Li Committed: Tue Aug 28 10:31:47 2018 -0700 -- .../spark/sql/catalyst/catalog/interface.scala | 8 +++-- .../org/apache/spark/sql/internal/SQLConf.scala | 8 + .../spark/sql/sources/BucketedWriteSuite.scala | 33 +++--- .../sql/sources/CreateTableAsSelectSuite.scala | 35 ++-- 4 files changed, 76 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de46df54/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index a4ead53..3842d79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -173,9 +174,12 @@ case class BucketSpec( numBuckets: Int, bucketColumnNames: Seq[String], sortColumnNames: Seq[String]) { - if (numBuckets <= 0 || numBuckets >= 10) { + def conf: SQLConf = SQLConf.get + + if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) { throw new AnalysisException( - s"Number of buckets should be greater than 0 but less than 10. Got `$numBuckets`") + s"Number of buckets should be greater than 0 but less than bucketing.maxBuckets " + +s"(`${conf.bucketingMaxBuckets}`). Got `$numBuckets`") } override def toString: String = { http://git-wip-us.apache.org/repos/asf/spark/blob/de46df54/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6336e89..738d8fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -674,6 +674,12 @@ object SQLConf { .booleanConf .createWithDefault(true) + val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets") +.doc("The maximum number of buckets allowed. Defaults to 10") +.intConf +.checkValue(_ > 0, "the value of spark.sql.sources.bucketing.maxBuckets must be larger than 0") +.createWithDefault(10) + val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled") .doc("When false, we will throw an error if a query contains a cartesian product without " + "explicit CROSS JOIN syntax.") @@ -1803,6 +1809,8 @@ class SQLConf extends Serializable with Logging { def bucketingEnabled: Boolean = getConf(SQLConf.BUCKETING_ENABLED) + def bucketingMaxBuckets: Int = getConf(SQLConf.BUCKETING_MAX_BUCKETS) + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) http://git-wip-us.apache.org/repos/asf/spark/blob/de46df54/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala -- diff --git a/s
spark git commit: [SPARK-25005][SS] Support non-consecutive offsets for Kafka
Repository: spark Updated Branches: refs/heads/master 592e3a42c -> 1149c4efb [SPARK-25005][SS] Support non-consecutive offsets for Kafka ## What changes were proposed in this pull request? As the user uses Kafka transactions to write data, the offsets in Kafka will be non-consecutive. It will contains some transaction (commit or abort) markers. In addition, if the consumer's `isolation.level` is `read_committed`, `poll` will not return aborted messages either. Hence, we will see non-consecutive offsets in the date returned by `poll`. However, as `seekToEnd` may move the offset point to these missing offsets, there are 4 possible corner cases we need to support: - The whole batch contains no data messages - The first offset in a batch is not a committed data message - The last offset in a batch is not a committed data message - There is a gap in the middle of a batch They are all covered by the new unit tests. ## How was this patch tested? The new unit tests. Closes #22042 from zsxwing/kafka-transaction-read. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1149c4ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1149c4ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1149c4ef Branch: refs/heads/master Commit: 1149c4efbc5ebe5b412d8f9c61558fef59179a9e Parents: 592e3a4 Author: Shixiong Zhu Authored: Tue Aug 28 08:38:07 2018 -0700 Committer: Shixiong Zhu Committed: Tue Aug 28 08:38:07 2018 -0700 -- .../kafka010/KafkaContinuousReadSupport.scala | 2 +- .../spark/sql/kafka010/KafkaDataConsumer.scala | 273 ++- .../kafka010/KafkaContinuousSourceSuite.scala | 149 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 255 - .../spark/sql/kafka010/KafkaRelationSuite.scala | 93 +++ .../spark/sql/kafka010/KafkaTestUtils.scala | 22 +- 6 files changed, 720 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala index 4a18839..1753a28 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala @@ -227,7 +227,7 @@ class KafkaContinuousPartitionReader( // This is a failOnDataLoss exception. Retry if nextKafkaOffset is within the data range, // or if it's the endpoint of the data range (i.e. the "true" next offset). -case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => +case e: IllegalStateException if e.getCause.isInstanceOf[OffsetOutOfRangeException] => val range = consumer.getAvailableOffsetRange() if (range.latest >= nextKafkaOffset && range.earliest <= nextKafkaOffset) { // retry http://git-wip-us.apache.org/repos/asf/spark/blob/1149c4ef/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala -- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index 65046c1..ceb9e31 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -33,9 +33,19 @@ import org.apache.spark.util.UninterruptibleThread private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. Otherwise it will either throw error - * (if failOnDataLoss = true), or return the next available offset within [offset, untilOffset), - * or null. + * Get the record for the given offset if available. + * + * If the record is invisible (either a + * transaction message, or an aborted message when the consumer's `isolation.level` is + * `read_committed`), it will be skipped and this method will try to fetch next available record + * within [offset, untilOffset). + * + * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will + * throw an exception when we detect an unavailable offset. If `failOnDa
spark git commit: [SPARK-25218][CORE] Fix potential resource leaks in TransportServer and SocketAuthHelper
Repository: spark Updated Branches: refs/heads/master 8198ea501 -> 592e3a42c [SPARK-25218][CORE] Fix potential resource leaks in TransportServer and SocketAuthHelper ## What changes were proposed in this pull request? Make sure TransportServer and SocketAuthHelper close the resources for all types of errors. ## How was this patch tested? Jenkins Closes #22210 from zsxwing/SPARK-25218. Authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/592e3a42 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/592e3a42 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/592e3a42 Branch: refs/heads/master Commit: 592e3a42c20b72edd6e8b9dd07da367596f43da5 Parents: 8198ea5 Author: Shixiong Zhu Authored: Tue Aug 28 08:36:06 2018 -0700 Committer: Shixiong Zhu Committed: Tue Aug 28 08:36:06 2018 -0700 -- .../buffer/FileSegmentManagedBuffer.java| 32 ++--- .../spark/network/server/TransportServer.java | 9 ++-- .../spark/security/SocketAuthHelper.scala | 50 +--- 3 files changed, 54 insertions(+), 37 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/592e3a42/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 8b8f989..45fee54 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -77,16 +77,16 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { return channel.map(FileChannel.MapMode.READ_ONLY, offset, length); } } catch (IOException e) { + String errorMessage = "Error in reading " + this; try { if (channel != null) { long size = channel.size(); - throw new IOException("Error in reading " + this + " (actual file length " + size + ")", -e); + errorMessage = "Error in reading " + this + " (actual file length " + size + ")"; } } catch (IOException ignored) { // ignore } - throw new IOException("Error in opening " + this, e); + throw new IOException(errorMessage, e); } finally { JavaUtils.closeQuietly(channel); } @@ -95,26 +95,24 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { @Override public InputStream createInputStream() throws IOException { FileInputStream is = null; +boolean shouldClose = true; try { is = new FileInputStream(file); ByteStreams.skipFully(is, offset); - return new LimitedInputStream(is, length); + InputStream r = new LimitedInputStream(is, length); + shouldClose = false; + return r; } catch (IOException e) { - try { -if (is != null) { - long size = file.length(); - throw new IOException("Error in reading " + this + " (actual file length " + size + ")", - e); -} - } catch (IOException ignored) { -// ignore - } finally { + String errorMessage = "Error in reading " + this; + if (is != null) { +long size = file.length(); +errorMessage = "Error in reading " + this + " (actual file length " + size + ")"; + } + throw new IOException(errorMessage, e); +} finally { + if (shouldClose) { JavaUtils.closeQuietly(is); } - throw new IOException("Error in opening " + this, e); -} catch (RuntimeException e) { - JavaUtils.closeQuietly(is); - throw e; } } http://git-wip-us.apache.org/repos/asf/spark/blob/592e3a42/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java -- diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java index d95ed22..9c85ab2 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -70,11 +70,14 @@ public class TransportServer implements Closeable { this.appRpcHandler = appRpcHandler; this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps)); +boolean shouldClose