[GitHub] packtpartner opened a new pull request #172: Update documentation.md
packtpartner opened a new pull request #172: Update documentation.md URL: https://github.com/apache/spark-website/pull/172 *Make sure that you generate site HTML with `jekyll build`, and include the changes to the HTML in your pull request also. See README.md for more information. Please remove this message.* This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Diff for: [GitHub] HyukjinKwon closed pull request #23506: [SPARK-26577][SQL] Add input optimizer when reading Hive table by SparkSQL
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 597eef129f63e..688376e634f9d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -128,6 +128,12 @@ private[spark] object HiveUtils extends Logging { .toSequence .createWithDefault(jdbcPrefixes) + val HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED = +buildConf("spark.sql.hive.inputFormat.optimizer.enabled") +.doc("When true, enable the optimizer of `fileInputFormat` in Spark SQL.") +.booleanConf +.createWithDefault(false) + private def jdbcPrefixes = Seq( "com.mysql.jdbc", "org.postgresql", "com.microsoft.sqlserver", "oracle.jdbc") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 536bc4a3f4ec4..ffc76bffa5bcd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -123,8 +123,7 @@ class HadoopTableReader( val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) // logDebug("Table input: %s".format(tablePath)) -val ifc = hiveTable.getInputFormatClass - .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] +val ifc = getAndOptimizeInput(hiveTable.getInputFormatClass.getName) val hadoopRDD = createHadoopRdd(localTableDesc, inputPathStr, ifc) val attrsWithIndex = attributes.zipWithIndex @@ -164,7 +163,7 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sparkSession.sessionState.conf.verifyPartitionPath) { + if (!conf.verifyPartitionPath) { partitionToDeserializer } else { val existPathSet = collection.mutable.Set[String]() @@ -202,8 +201,7 @@ class HadoopTableReader( val partDesc = Utilities.getPartitionDesc(partition) val partPath = partition.getDataLocation val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) - val ifc = partDesc.getInputFileFormatClass -.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + val ifc = getAndOptimizeInput(partDesc.getInputFileFormatClassName) // Get partition field info val partSpec = partDesc.getPartSpec val partProps = partDesc.getProperties @@ -311,6 +309,32 @@ class HadoopTableReader( // Only take the value (skip the key) because Hive works only with values. rdd.map(_._2) } + + /** + * If `spark.sql.hive.fileInputFormat.enabled` is true, this function will optimize the input + * method(including format and the size of splits) while reading Hive tables. + */ + private def getAndOptimizeInput( +inputClassName: String): Class[InputFormat[Writable, Writable]] = { + +var ifc = Utils.classForName(inputClassName) + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] +if (conf.getConf(HiveUtils.HIVE_INPUT_FORMAT_OPTIMIZER_ENABLED)) { + if ("org.apache.hadoop.mapreduce.lib.input.TextInputFormat" +.equals(inputClassName)) { +ifc = Utils.classForName( + "org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat") + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + } + if ("org.apache.hadoop.mapred.TextInputFormat" +.equals(inputClassName)) { +ifc = Utils.classForName( + "org.apache.hadoop.mapred.lib.CombineTextInputFormat") + .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] + } +} +ifc + } } private[hive] object HiveTableUtil { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala index 3f9bb8de42e09..3371bce7087e5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala @@ -192,4 +192,47 @@ class HiveTableScanSuite extends HiveComparisonTest with SQLTestUtils with TestH case p: HiveTableScanExec => p }.get } -} + + test("Test the InputFormat optimizer") { +withTable("table_old", "table_pt_old", "table_new", "table_pt_new") { + sql("set spark.sql.hive.fileInputFormat.enabled=true") + sql("set spark.sql.hive.fileInputFormat.split.maxsize=134217728") + sql("set spark.sql.hive.fileInputFormat.split.minsize=134217728") + sql( +s""" + |CREATE TABLE table_old (id int) + |STORED AS + |INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' + |OUTPUTFORMAT
svn commit: r31981 - in /dev/spark/3.0.0-SNAPSHOT-2019_01_15_20_08-cf133e6-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Wed Jan 16 04:20:58 2019 New Revision: 31981 Log: Apache Spark 3.0.0-SNAPSHOT-2019_01_15_20_08-cf133e6 docs [This commit notification would consist of 1777 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Diff for: [GitHub] cloud-fan closed pull request #23521: [SPARK-26604][CORE] Clean up channel registration for StreamManager
diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java index f08d8b0f984cf..43c3d23b6304d 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/ChunkFetchRequestHandler.java @@ -90,7 +90,6 @@ protected void channelRead0( ManagedBuffer buf; try { streamManager.checkAuthorization(client, msg.streamChunkId.streamId); - streamManager.registerChannel(channel, msg.streamChunkId.streamId); buf = streamManager.getChunk(msg.streamChunkId.streamId, msg.streamChunkId.chunkIndex); } catch (Exception e) { logger.error(String.format("Error opening block %s for request from %s", diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index 0f6a8824d95e5..6fafcc131fa24 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.netty.channel.Channel; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -49,7 +50,7 @@ final Iterator buffers; // The channel associated to the stream -Channel associatedChannel = null; +final Channel associatedChannel; // Used to keep track of the index of the buffer that the user has retrieved, just to ensure // that the caller only requests each chunk one at a time, in order. @@ -58,9 +59,10 @@ // Used to keep track of the number of chunks being transferred and not finished yet. volatile long chunksBeingTransferred = 0L; -StreamState(String appId, Iterator buffers) { +StreamState(String appId, Iterator buffers, Channel channel) { this.appId = appId; this.buffers = Preconditions.checkNotNull(buffers); + this.associatedChannel = channel; } } @@ -71,13 +73,6 @@ public OneForOneStreamManager() { streams = new ConcurrentHashMap<>(); } - @Override - public void registerChannel(Channel channel, long streamId) { -if (streams.containsKey(streamId)) { - streams.get(streamId).associatedChannel = channel; -} - } - @Override public ManagedBuffer getChunk(long streamId, int chunkIndex) { StreamState state = streams.get(streamId); @@ -195,11 +190,19 @@ public long chunksBeingTransferred() { * * If an app ID is provided, only callers who've authenticated with the given app ID will be * allowed to fetch from this stream. + * + * This method also associates the stream with a single client connection, which is guaranteed + * to be the only reader of the stream. Once the connection is closed, the stream will never + * be used again, enabling cleanup by `connectionTerminated`. */ - public long registerStream(String appId, Iterator buffers) { + public long registerStream(String appId, Iterator buffers, Channel channel) { long myStreamId = nextStreamId.getAndIncrement(); -streams.put(myStreamId, new StreamState(appId, buffers)); +streams.put(myStreamId, new StreamState(appId, buffers, channel)); return myStreamId; } + @VisibleForTesting + public int numStreamStates() { +return streams.size(); + } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java b/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java index c535295831606..e48d27be1126a 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java +++ b/common/network-common/src/main/java/org/apache/spark/network/server/StreamManager.java @@ -60,16 +60,6 @@ public ManagedBuffer openStream(String streamId) { throw new UnsupportedOperationException(); } - /** - * Associates a stream with a single client connection, which is guaranteed to be the only reader - * of the stream. The getChunk() method will be called serially on this connection and once the - * connection is closed, the stream will never be used again, enabling cleanup. - * - * This must be called before the first getChunk() on the stream, but it may be invoked multiple - * times with the same channel and stream id. - */ - public void registerChannel(Channel channel, long streamId) { } - /** * Indicates that the given channel has been terminated. After this occurs, we are guaranteed not * to read from
[spark] branch master updated (2ebb79b -> cf133e6)
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git. from 2ebb79b [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350 add cf133e6 [SPARK-26604][CORE] Clean up channel registration for StreamManager No new revisions were added by this update. Summary of changes: .../network/server/ChunkFetchRequestHandler.java | 1 - .../network/server/OneForOneStreamManager.java | 25 -- .../apache/spark/network/server/StreamManager.java | 10 - .../network/ChunkFetchRequestHandlerSuite.java | 3 +-- .../network/TransportRequestHandlerSuite.java | 9 ++-- .../server/OneForOneStreamManagerSuite.java| 5 +++-- .../shuffle/ExternalShuffleBlockHandler.java | 2 +- .../shuffle/ExternalShuffleBlockHandlerSuite.java | 3 ++- .../spark/network/netty/NettyBlockRpcServer.scala | 3 ++- 9 files changed, 30 insertions(+), 31 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] srowen commented on issue #171: Remove old releases from download page. Remove unused 'stable' flag and old info
srowen commented on issue #171: Remove old releases from download page. Remove unused 'stable' flag and old info URL: https://github.com/apache/spark-website/pull/171#issuecomment-454599623 I merged this to ensure that the web page matches what is currently mirrored and in the archives. I know it's a little bit of a change to only include the latest supported releases, but think that is actually what the page should show. All remain available in the ASF archive, which is linked to from this page. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] srowen closed pull request #171: Remove old releases from download page. Remove unused 'stable' flag and old info
srowen closed pull request #171: Remove old releases from download page. Remove unused 'stable' flag and old info URL: https://github.com/apache/spark-website/pull/171 As this is a foreign pull request (from a fork), the diff has been sent to your commit mailing list, commits@spark.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-website] branch asf-site updated: Remove old releases from download page. Remove unused 'stable' flag and old info
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new 31485a3 Remove old releases from download page. Remove unused 'stable' flag and old info 31485a3 is described below commit 31485a30bc25e7333a8e698896b5bc05d22f994f Author: Sean Owen AuthorDate: Tue Jan 15 18:04:41 2019 -0600 Remove old releases from download page. Remove unused 'stable' flag and old info Author: Sean Owen Closes #171 from srowen/OldReleases. --- downloads.md | 17 - js/downloads.js | 52 +++- site/downloads.html | 17 - site/js/downloads.js | 52 +++- 4 files changed, 14 insertions(+), 124 deletions(-) diff --git a/downloads.md b/downloads.md index 9cc5414..a4ae5f4 100644 --- a/downloads.md +++ b/downloads.md @@ -26,23 +26,6 @@ $(document).ready(function() { 4. Verify this release using the and [project release KEYS](https://www.apache.org/dist/spark/KEYS). -_Note: Starting version 2.0, Spark is built with Scala 2.11 by default. -Scala 2.10 users should download the Spark source package and build -[with Scala 2.10 support](https://spark.apache.org/docs/latest/building-spark.html#building-for-scala-210)._ - - - ### Link with Spark Spark artifacts are [hosted in Maven Central](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22). You can add a Maven dependency with the following coordinates: diff --git a/js/downloads.js b/js/downloads.js index 1d4c1e7..f101350 100644 --- a/js/downloads.js +++ b/js/downloads.js @@ -3,50 +3,28 @@ releases = {}; -function addRelease(version, releaseDate, packages, stable, mirrored) { +function addRelease(version, releaseDate, packages, mirrored) { releases[version] = { released: releaseDate, packages: packages, -stable: stable, mirrored: mirrored }; } var sources = {pretty: "Source Code", tag: "sources"}; var hadoopFree = {pretty: "Pre-build with user-provided Apache Hadoop", tag: "without-hadoop"}; -var hadoop1 = {pretty: "Pre-built for Apache Hadoop 1.X", tag: "hadoop1"}; -var cdh4 = {pretty: "Pre-built for CDH 4", tag: "cdh4"}; -//var hadoop2 = {pretty: "Pre-built for Apache Hadoop 2.2", tag: "hadoop2"}; -var hadoop2p3 = {pretty: "Pre-built for Apache Hadoop 2.3", tag: "hadoop2.3"}; -var hadoop2p4 = {pretty: "Pre-built for Apache Hadoop 2.4", tag: "hadoop2.4"}; var hadoop2p6 = {pretty: "Pre-built for Apache Hadoop 2.6", tag: "hadoop2.6"}; var hadoop2p7 = {pretty: "Pre-built for Apache Hadoop 2.7 and later", tag: "hadoop2.7"}; var scala2p12_hadoopFree = {pretty: "[Experimental] Pre-build with Scala 2.12 and user-provided Apache Hadoop", tag: "without-hadoop-scala-2.12"}; -// 1.4.0+ -var packagesV6 = [hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, hadoop1, cdh4, sources]; -// 2.0.0+ -var packagesV7 = [hadoop2p7, hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, sources]; // 2.2.0+ var packagesV8 = [hadoop2p7, hadoop2p6, hadoopFree, sources]; // 2.4.0+ var packagesV9 = [hadoop2p7, hadoop2p6, hadoopFree, scala2p12_hadoopFree, sources]; -addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); -addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); -addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, false); -addRelease("2.3.0", new Date("02/28/2018"), packagesV8, true, false); -addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true, true); -addRelease("2.2.2", new Date("07/02/2018"), packagesV8, true, false); -addRelease("2.2.1", new Date("12/01/2017"), packagesV8, true, false); -addRelease("2.2.0", new Date("07/11/2017"), packagesV8, true, false); -addRelease("2.1.3", new Date("06/29/2018"), packagesV7, true, true); -addRelease("2.1.2", new Date("10/09/2017"), packagesV7, true, false); -addRelease("2.1.1", new Date("05/02/2017"), packagesV7, true, false); -addRelease("2.1.0", new Date("12/28/2016"), packagesV7, true, false); -addRelease("2.0.2", new Date("11/14/2016"), packagesV7, true, false); -//addRelease("2.0.0-preview", new Date("05/24/2016"), sources.concat(packagesV7), true, false); -addRelease("1.6.3", new Date("11/07/2016"), packagesV6, true, false); +addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true); +addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true); +addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true); function append(el, contents) { el.innerHTML += contents; @@ -65,36 +43,20 @@ function versionShort(version) { return version.replace(/-incubating/, ""); } function initDownloads() { var versionSelect = document.getElementById("sparkVersionSelect"); - // Populate stable versions append(versionSelect, ""); for (var version in releases) { -if
[spark-website] Diff for: [GitHub] srowen closed pull request #171: Remove old releases from download page. Remove unused 'stable' flag and old info
diff --git a/downloads.md b/downloads.md index 9cc5414a4..a4ae5f494 100644 --- a/downloads.md +++ b/downloads.md @@ -26,23 +26,6 @@ $(document).ready(function() { 4. Verify this release using the and [project release KEYS](https://www.apache.org/dist/spark/KEYS). -_Note: Starting version 2.0, Spark is built with Scala 2.11 by default. -Scala 2.10 users should download the Spark source package and build -[with Scala 2.10 support](https://spark.apache.org/docs/latest/building-spark.html#building-for-scala-210)._ - - - ### Link with Spark Spark artifacts are [hosted in Maven Central](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22). You can add a Maven dependency with the following coordinates: diff --git a/js/downloads.js b/js/downloads.js index 1d4c1e77f..f10135007 100644 --- a/js/downloads.js +++ b/js/downloads.js @@ -3,50 +3,28 @@ releases = {}; -function addRelease(version, releaseDate, packages, stable, mirrored) { +function addRelease(version, releaseDate, packages, mirrored) { releases[version] = { released: releaseDate, packages: packages, -stable: stable, mirrored: mirrored }; } var sources = {pretty: "Source Code", tag: "sources"}; var hadoopFree = {pretty: "Pre-build with user-provided Apache Hadoop", tag: "without-hadoop"}; -var hadoop1 = {pretty: "Pre-built for Apache Hadoop 1.X", tag: "hadoop1"}; -var cdh4 = {pretty: "Pre-built for CDH 4", tag: "cdh4"}; -//var hadoop2 = {pretty: "Pre-built for Apache Hadoop 2.2", tag: "hadoop2"}; -var hadoop2p3 = {pretty: "Pre-built for Apache Hadoop 2.3", tag: "hadoop2.3"}; -var hadoop2p4 = {pretty: "Pre-built for Apache Hadoop 2.4", tag: "hadoop2.4"}; var hadoop2p6 = {pretty: "Pre-built for Apache Hadoop 2.6", tag: "hadoop2.6"}; var hadoop2p7 = {pretty: "Pre-built for Apache Hadoop 2.7 and later", tag: "hadoop2.7"}; var scala2p12_hadoopFree = {pretty: "[Experimental] Pre-build with Scala 2.12 and user-provided Apache Hadoop", tag: "without-hadoop-scala-2.12"}; -// 1.4.0+ -var packagesV6 = [hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, hadoop1, cdh4, sources]; -// 2.0.0+ -var packagesV7 = [hadoop2p7, hadoop2p6, hadoop2p4, hadoop2p3, hadoopFree, sources]; // 2.2.0+ var packagesV8 = [hadoop2p7, hadoop2p6, hadoopFree, sources]; // 2.4.0+ var packagesV9 = [hadoop2p7, hadoop2p6, hadoopFree, scala2p12_hadoopFree, sources]; -addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); -addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); -addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, false); -addRelease("2.3.0", new Date("02/28/2018"), packagesV8, true, false); -addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true, true); -addRelease("2.2.2", new Date("07/02/2018"), packagesV8, true, false); -addRelease("2.2.1", new Date("12/01/2017"), packagesV8, true, false); -addRelease("2.2.0", new Date("07/11/2017"), packagesV8, true, false); -addRelease("2.1.3", new Date("06/29/2018"), packagesV7, true, true); -addRelease("2.1.2", new Date("10/09/2017"), packagesV7, true, false); -addRelease("2.1.1", new Date("05/02/2017"), packagesV7, true, false); -addRelease("2.1.0", new Date("12/28/2016"), packagesV7, true, false); -addRelease("2.0.2", new Date("11/14/2016"), packagesV7, true, false); -//addRelease("2.0.0-preview", new Date("05/24/2016"), sources.concat(packagesV7), true, false); -addRelease("1.6.3", new Date("11/07/2016"), packagesV6, true, false); +addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true); +addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true); +addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true); function append(el, contents) { el.innerHTML += contents; @@ -65,36 +43,20 @@ function versionShort(version) { return version.replace(/-incubating/, ""); } function initDownloads() { var versionSelect = document.getElementById("sparkVersionSelect"); - // Populate stable versions append(versionSelect, ""); for (var version in releases) { -if (releases[version].stable) { - var releaseDate = releases[version].released; - var title = versionShort(version) + " (" + releaseDate.toDateString().slice(4) + ")"; - append(versionSelect, "" + title + ""); -} +var releaseDate = releases[version].released; +var title = versionShort(version) + " (" + releaseDate.toDateString().slice(4) + ")"; +append(versionSelect, "" + title + ""); } append(versionSelect, ""); - // Populate other versions - // append(versionSelect, ""); - //for (var version in releases) { - // if (!releases[version].stable) { - //var releaseDate = releases[version].released; - //var title = versionShort(version) + " (" + releaseDate.toDateString().slice(4) + ")"; - //append(versionSelect, "" + title + ""); - // } - //} - //append(versionSelect, ""); - - // Populate packages and (transitively) releases onVersionSelect(); } function initReleaseNotes() {
svn commit: r31978 - in /dev/spark/3.0.0-SNAPSHOT-2019_01_15_15_35-2ebb79b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jan 15 23:47:29 2019 New Revision: 31978 Log: Apache Spark 3.0.0-SNAPSHOT-2019_01_15_15_35-2ebb79b docs [This commit notification would consist of 1777 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Diff for: [GitHub] asfgit closed pull request #23544: [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 64020882b306e..cb453846134ed 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -28,6 +28,7 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random +import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) +val customGroupId = "id-" + Random.nextInt() val dsKafka = spark .readStream .format("kafka") - .option("kafka.group.id", "id-" + Random.nextInt()) + .option("kafka.group.id", customGroupId) .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("subscribe", topic) .option("startingOffsets", "earliest") @@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testStream(dsKafka)( makeSureGetOffsetCalled, - CheckAnswer(1 to 30: _*) + CheckAnswer(1 to 30: _*), + Execute { _ => +val consumerGroups = testUtils.listConsumerGroups() +val validGroups = consumerGroups.valid().get() +val validGroupsId = validGroups.asScala.map(_.groupId()) +assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " + + s"contain the expected group id - Valid consumer groups: $validGroupsId / " + + s"expected group id: $customGroupId") + } ) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index efe7385ed16bc..2cd13a994ee82 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010 import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters._ +import scala.util.Random + import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition @@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) -val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom")) +val customGroupId = "id-" + Random.nextInt() +val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId)) checkAnswer(df, (1 to 30).map(_.toString).toDF()) + +val consumerGroups = testUtils.listConsumerGroups() +val validGroups = consumerGroups.valid().get() +val validGroupsId = validGroups.asScala.map(_.groupId()) +assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " + + s"contain the expected group id - Valid consumer groups: $validGroupsId / " + + s"expected group id: $customGroupId") } test("read Kafka transactional messages: read_committed") { diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index bf6934be52705..dacfffa867534 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -33,7 +33,7 @@ import kafka.server.{KafkaConfig, KafkaServer} import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils.ZkUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, NewPartitions} +import org.apache.kafka.clients.admin.{AdminClient, CreatePartitionsOptions, ListConsumerGroupsResult, NewPartitions} import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer._ import org.apache.kafka.common.TopicPartition @@ -311,6 +311,10 @@ class KafkaTestUtils(withBrokerProps:
[spark] branch master updated: [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350
This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 2ebb79b [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350 2ebb79b is described below commit 2ebb79b2a607aa25ea22826d9c5d6af18c97a7f2 Author: Jungtaek Lim (HeartSaVioR) AuthorDate: Tue Jan 15 14:21:51 2019 -0800 [SPARK-26350][FOLLOWUP] Add actual verification on new UT introduced on SPARK-26350 ## What changes were proposed in this pull request? This patch adds the check to verify consumer group id is given correctly when custom group id is provided to Kafka parameter. ## How was this patch tested? Modified UT. Closes #23544 from HeartSaVioR/SPARK-26350-follow-up-actual-verification-on-UT. Authored-by: Jungtaek Lim (HeartSaVioR) Signed-off-by: Shixiong Zhu --- .../spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala| 14 -- .../org/apache/spark/sql/kafka010/KafkaRelationSuite.scala | 13 - .../org/apache/spark/sql/kafka010/KafkaTestUtils.scala | 6 +- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 6402088..cb45384 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -28,6 +28,7 @@ import scala.collection.JavaConverters._ import scala.io.Source import scala.util.Random +import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.TopicPartition import org.scalatest.concurrent.PatienceConfiguration.Timeout @@ -638,10 +639,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) +val customGroupId = "id-" + Random.nextInt() val dsKafka = spark .readStream .format("kafka") - .option("kafka.group.id", "id-" + Random.nextInt()) + .option("kafka.group.id", customGroupId) .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("subscribe", topic) .option("startingOffsets", "earliest") @@ -652,7 +654,15 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { testStream(dsKafka)( makeSureGetOffsetCalled, - CheckAnswer(1 to 30: _*) + CheckAnswer(1 to 30: _*), + Execute { _ => +val consumerGroups = testUtils.listConsumerGroups() +val validGroups = consumerGroups.valid().get() +val validGroupsId = validGroups.asScala.map(_.groupId()) +assert(validGroupsId.exists(_ === customGroupId), "Valid consumer groups don't " + + s"contain the expected group id - Valid consumer groups: $validGroupsId / " + + s"expected group id: $customGroupId") + } ) } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala index efe7385..2cd13a9 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.kafka010 import java.util.Locale import java.util.concurrent.atomic.AtomicInteger +import scala.collection.JavaConverters._ +import scala.util.Random + import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition @@ -247,8 +250,16 @@ class KafkaRelationSuite extends QueryTest with SharedSQLContext with KafkaTest testUtils.sendMessages(topic, (11 to 20).map(_.toString).toArray, Some(1)) testUtils.sendMessages(topic, (21 to 30).map(_.toString).toArray, Some(2)) -val df = createDF(topic, withOptions = Map("kafka.group.id" -> "custom")) +val customGroupId = "id-" + Random.nextInt() +val df = createDF(topic, withOptions = Map("kafka.group.id" -> customGroupId)) checkAnswer(df, (1 to 30).map(_.toString).toDF()) + +val consumerGroups = testUtils.listConsumerGroups() +val validGroups = consumerGroups.valid().get() +val validGroupsId = validGroups.asScala.map(_.groupId()) +assert(validGroupsId.exists(_ === customGroupId), "Valid
[spark] Diff for: [GitHub] asfgit closed pull request #23208: [SPARK-25530][SQL] data source v2 API refactor (batch write)
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java deleted file mode 100644 index df439e2c02fe3..0 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.Optional; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability for batch processing. - * - * This interface is used to create {@link BatchWriteSupport} instances when end users run - * {@code Dataset.write.format(...).option(...).save()}. - */ -@Evolving -public interface BatchWriteSupportProvider extends DataSourceV2 { - - /** - * Creates an optional {@link BatchWriteSupport} instance to save the data to this data source, - * which is called by Spark at the beginning of each batch query. - * - * Data sources can return None if there is no writing needed to be done according to the save - * mode. - * - * @param queryId A unique string for the writing query. It's possible that there are many - *writing queries running at the same time, and the returned - *{@link BatchWriteSupport} can use this id to distinguish itself from others. - * @param schema the schema of the data to be written. - * @param mode the save mode which determines what to do when the data are already in this data - * source, please refer to {@link SaveMode} for more details. - * @param options the options for the returned data source writer, which is an immutable - *case-insensitive string-to-string map. - * @return a write support to write data to this data source. - */ - Optional createBatchWriteSupport( - String queryId, - StructType schema, - SaveMode mode, - DataSourceOptions options); -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java index eae7a45d1d446..4aaa57dd4db9d 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceV2.java @@ -20,15 +20,7 @@ import org.apache.spark.annotation.Evolving; /** - * The base interface for data source v2. Implementations must have a public, 0-arg constructor. - * - * Note that this is an empty interface. Data source implementations must mix in interfaces such as - * {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide - * batch or streaming read/write support instances. Otherwise it's just a dummy data source which - * is un-readable/writable. - * - * If Spark fails to execute any methods in the implementations of this interface (by throwing an - * exception), the read action will fail and no Spark job will be submitted. + * TODO: remove it when we finish the API refactor for streaming side. */ @Evolving public interface DataSourceV2 {} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java new file mode 100644 index 0..08caadd5308e6 --- /dev/null +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *
[spark] branch master updated: [SPARK-25530][SQL] data source v2 API refactor (batch write)
This is an automated email from the ASF dual-hosted git repository. lixiao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 954ef96 [SPARK-25530][SQL] data source v2 API refactor (batch write) 954ef96 is described below commit 954ef96c495268e3a22b7b661ce45c558b532c65 Author: Wenchen Fan AuthorDate: Tue Jan 15 13:53:48 2019 -0800 [SPARK-25530][SQL] data source v2 API refactor (batch write) ## What changes were proposed in this pull request? Adjust the batch write API to match the read API refactor after https://github.com/apache/spark/pull/23086 The doc with high-level ideas: https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed. This PR also removes the test from https://github.com/apache/spark/pull/22688 . Now data source must return a table for read/write. A few notes about future changes: 1. We will create `SupportsStreamingWrite` later for streaming APIs 2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future. 3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356 ## How was this patch tested? existing tests Closes #23208 from cloud-fan/refactor-batch. Authored-by: Wenchen Fan Signed-off-by: gatorsmile --- .../sql/sources/v2/BatchWriteSupportProvider.java | 59 -- .../apache/spark/sql/sources/v2/DataSourceV2.java | 10 +-- .../{SupportsRead.java => SupportsBatchWrite.java} | 23 +++--- .../apache/spark/sql/sources/v2/SupportsRead.java | 5 +- .../v2/{SupportsRead.java => SupportsWrite.java} | 18 ++--- .../{BatchWriteSupport.java => BatchWrite.java}| 2 +- .../spark/sql/sources/v2/writer/DataWriter.java| 12 +-- .../sql/sources/v2/writer/DataWriterFactory.java | 2 +- .../SupportsSaveMode.java} | 21 ++--- .../spark/sql/sources/v2/writer/WriteBuilder.java | 69 .../sql/sources/v2/writer/WriterCommitMessage.java | 2 +- .../org/apache/spark/sql/DataFrameReader.scala | 8 +- .../org/apache/spark/sql/DataFrameWriter.scala | 50 ++-- .../datasources/v2/DataSourceV2Relation.scala | 89 ++--- .../datasources/v2/DataSourceV2Strategy.scala | 14 +++- .../datasources/v2/WriteToDataSourceV2Exec.scala | 26 +++ .../execution/streaming/MicroBatchExecution.scala | 4 +- ...atchWritSupport.scala => MicroBatchWrite.scala} | 7 +- .../streaming/sources/PackedRowWriterFactory.scala | 4 +- .../spark/sql/sources/v2/DataSourceV2Suite.scala | 28 +++ .../sql/sources/v2/SimpleWritableDataSource.scala | 91 +- 21 files changed, 262 insertions(+), 282 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java deleted file mode 100644 index df439e2..000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/BatchWriteSupportProvider.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.util.Optional; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.SaveMode; -import org.apache.spark.sql.sources.v2.writer.BatchWriteSupport; -import org.apache.spark.sql.types.StructType; - -/** - * A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to - * provide data writing ability for batch processing. - * - * This interface is used to create {@link BatchWriteSupport} instances when end users run - * {@code
[spark] Diff for: [GitHub] srowen closed pull request #17726: [SPARK-17928] [Mesos] No driver.memoryOverhead setting for mesos cluster mode
diff --git a/docs/configuration.md b/docs/configuration.md index ff9b802617f08..b961e5b2f32a6 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -169,7 +169,7 @@ of the most common options to set are: The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). -This option is currently supported on YARN and Kubernetes. +This option is currently supported on YARN, Mesos and Kubernetes. diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index a4aba3e9c0d05..3ff68348be7b1 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -68,6 +68,10 @@ private[mesos] class MesosSubmitRequestServlet( private def newDriverId(submitDate: Date): String = f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d" + // These defaults copied from YARN + private val MEMORY_OVERHEAD_FACTOR = 0.10 + private val MEMORY_OVERHEAD_MIN = 384 + /** * Build a driver description from the fields specified in the submit request. * @@ -98,6 +102,7 @@ private[mesos] class MesosSubmitRequestServlet( val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key) val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key) val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key) +val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key) val driverCores = sparkProperties.get(config.DRIVER_CORES.key) val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) @@ -112,13 +117,15 @@ private[mesos] class MesosSubmitRequestServlet( mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) +val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN)) val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) val submitDate = new Date() val submissionId = newDriverId(submitDate) new MesosDriverDescription( - name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, - command, request.sparkProperties, submissionId, submitDate) + name, appResource, actualDriverMemory + actualDriverMemoryOverhead, actualDriverCores, + actualSuperviseDriver, command, request.sparkProperties, submissionId, submitDate) } protected override def handleSubmit( diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala new file mode 100644 index 0..1f83149a05652 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.rest.mesos + +import javax.servlet.http.HttpServletResponse + +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.TestPrematureExit +import org.apache.spark.deploy.mesos.MesosDriverDescription +import org.apache.spark.deploy.rest.{CreateSubmissionRequest, CreateSubmissionResponse, SubmitRestProtocolMessage, SubmitRestProtocolResponse} +import org.apache.spark.internal.config +import
[spark] branch master updated: [SPARK-17928][MESOS] No driver.memoryOverhead setting for mesos cluster mode
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 1b75f3b [SPARK-17928][MESOS] No driver.memoryOverhead setting for mesos cluster mode 1b75f3b is described below commit 1b75f3bcfff9cb82995344a0b2aef9d184437613 Author: Devaraj K AuthorDate: Tue Jan 15 15:45:20 2019 -0600 [SPARK-17928][MESOS] No driver.memoryOverhead setting for mesos cluster mode ## What changes were proposed in this pull request? Added a new configuration 'spark.mesos.driver.memoryOverhead' for providing the driver memory overhead in mesos cluster mode. ## How was this patch tested? Verified it manually, Resource Scheduler allocates (drivermemory+ driver memoryOverhead) for driver in mesos cluster mode. Closes #17726 from devaraj-kavali/SPARK-17928. Authored-by: Devaraj K Signed-off-by: Sean Owen --- docs/configuration.md | 2 +- .../spark/deploy/rest/mesos/MesosRestServer.scala | 11 ++- .../deploy/rest/mesos/MesosRestServerSuite.scala | 85 ++ 3 files changed, 95 insertions(+), 3 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index 3c383ee..7d3bbf9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -169,7 +169,7 @@ of the most common options to set are: The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless otherwise specified. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). -This option is currently supported on YARN and Kubernetes. +This option is currently supported on YARN, Mesos and Kubernetes. diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala index a4aba3e..3ff6834 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/rest/mesos/MesosRestServer.scala @@ -68,6 +68,10 @@ private[mesos] class MesosSubmitRequestServlet( private def newDriverId(submitDate: Date): String = f"driver-${createDateFormat.format(submitDate)}-${nextDriverNumber.incrementAndGet()}%04d" + // These defaults copied from YARN + private val MEMORY_OVERHEAD_FACTOR = 0.10 + private val MEMORY_OVERHEAD_MIN = 384 + /** * Build a driver description from the fields specified in the submit request. * @@ -98,6 +102,7 @@ private[mesos] class MesosSubmitRequestServlet( val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key) val superviseDriver = sparkProperties.get(config.DRIVER_SUPERVISE.key) val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key) +val driverMemoryOverhead = sparkProperties.get(config.DRIVER_MEMORY_OVERHEAD.key) val driverCores = sparkProperties.get(config.DRIVER_CORES.key) val name = request.sparkProperties.getOrElse("spark.app.name", mainClass) @@ -112,13 +117,15 @@ private[mesos] class MesosSubmitRequestServlet( mainClass, appArgs, environmentVariables, extraClassPath, extraLibraryPath, javaOpts) val actualSuperviseDriver = superviseDriver.map(_.toBoolean).getOrElse(DEFAULT_SUPERVISE) val actualDriverMemory = driverMemory.map(Utils.memoryStringToMb).getOrElse(DEFAULT_MEMORY) +val actualDriverMemoryOverhead = driverMemoryOverhead.map(_.toInt).getOrElse( + math.max((MEMORY_OVERHEAD_FACTOR * actualDriverMemory).toInt, MEMORY_OVERHEAD_MIN)) val actualDriverCores = driverCores.map(_.toDouble).getOrElse(DEFAULT_CORES) val submitDate = new Date() val submissionId = newDriverId(submitDate) new MesosDriverDescription( - name, appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, - command, request.sparkProperties, submissionId, submitDate) + name, appResource, actualDriverMemory + actualDriverMemoryOverhead, actualDriverCores, + actualSuperviseDriver, command, request.sparkProperties, submissionId, submitDate) } protected override def handleSubmit( diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala new file mode 100644 index 000..1f83149 --- /dev/null +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/deploy/rest/mesos/MesosRestServerSuite.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license
[spark] Diff for: [GitHub] asfgit closed pull request #23348: [SPARK-25857][core] Add developer documentation regarding delegation tokens.
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/README.md b/core/src/main/scala/org/apache/spark/deploy/security/README.md new file mode 100644 index 0..c3ef60a231f0e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/README.md @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos tokens: + +* A "renewable period (equivalent to TGT's "lifetime") which is for how long the DT is valid + before it requires renewal. +* A "max lifetime" (equivalent to TGT's "renewable life") which is for how long the DT can be + renewed. + +Once the token reaches its "max lifetime", a new one needs to be created by contacting the +appropriate service, restarting the above process. + + +## DT Renewal, Renewers, and YARN + +This is the most confusing part of DT handling, and part of it is because much of the system was +designed with MapReduce, and later YARN, in mind. + +As seen above, DTs need to be renewed periodically until they finally expire for good. An example of +this is the default configuration of HDFS services: delegation tokens are valid for up to 7 days, +and
[spark] branch master updated: [SPARK-25857][CORE] Add developer documentation regarding delegation tokens.
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8a54492 [SPARK-25857][CORE] Add developer documentation regarding delegation tokens. 8a54492 is described below commit 8a54492149180b57b042e3406fe4b1e53df97291 Author: Marcelo Vanzin AuthorDate: Tue Jan 15 11:23:38 2019 -0800 [SPARK-25857][CORE] Add developer documentation regarding delegation tokens. Closes #23348 from vanzin/SPARK-25857. Authored-by: Marcelo Vanzin Signed-off-by: Marcelo Vanzin --- .../org/apache/spark/deploy/security/README.md | 249 + 1 file changed, 249 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/README.md b/core/src/main/scala/org/apache/spark/deploy/security/README.md new file mode 100644 index 000..c3ef60a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/security/README.md @@ -0,0 +1,249 @@ +# Delegation Token Handling In Spark + +This document aims to explain and demystify delegation tokens as they are used by Spark, since +this topic is generally a huge source of confusion. + + +## What are delegation tokens and why use them? + +Delegation tokens (DTs from now on) are authentication tokens used by some services to replace +Kerberos service tokens. Many services in the Hadoop ecosystem have support for DTs, since they +have some very desirable advantages over Kerberos tokens: + +* No need to distribute Kerberos credentials + +In a distributed application, distributing Kerberos credentials is tricky. Not all users have +keytabs, and when they do, it's generally frowned upon to distribute them over the network as +part of application data. + +DTs allow for a single place (e.g. the Spark driver) to require Kerberos credentials. That entity +can then distribute the DTs to other parts of the distributed application (e.g. Spark executors), +so they can authenticate to services. + +* A single token per service is used for authentication + +If Kerberos authentication were used, each client connection to a server would require a trip +to the KDC and generation of a service ticket. In a distributed system, the number of service +tickets can balloon pretty quickly when you think about the number of client processes (e.g. Spark +executors) vs. the number of service processes (e.g. HDFS DataNodes). That generates unnecessary +extra load on the KDC, and may even run into usage limits set up by the KDC admin. + +* DTs are only used for authentication + +DTs, unlike TGTs, can only be used to authenticate to the specific service for which they were +issued. You cannot use an existing DT to create new DTs or to create DTs for a different service. + +So in short, DTs are *not* Kerberos tokens. They are used by many services to replace Kerberos +authentication, or even other forms of authentication, although there is nothing (aside from +maybe implementation details) that ties them to Kerberos or any other authentication mechanism. + + +## Lifecycle of DTs + +DTs, unlike Kerberos tokens, are service-specific. There is no centralized location you contact +to create a DT for a service. So, the first step needed to get a DT is being able to authenticate +to the service in question. In the Hadoop ecosystem, that is generally done using Kerberos. + +This requires Kerberos credentials to be available somewhere for the application to use. The user +is generally responsible for providing those credentials, which is most commonly done by logging +in to the KDC (e.g. using "kinit"). That generates a (Kerberos) "token cache" containing a TGT +(ticket granting ticket), which can then be used to request service tickets. + +There are other ways of obtaining TGTs, but, ultimately, you need a TGT to bootstrap the process. + +Once a TGT is available, the target service's client library can then be used to authenticate +to the service using the Kerberos credentials, and request the creation of a delegation token. +This token can now be sent to other processes and used to authenticate to different daemons +belonging to that service. + +And thus the first drawback of DTs becomes apparent: you need service-specific logic to create and +use them. + +Spark implements a (somewhat) pluggable, internal DT creation API. Support for new services can be +added by implementing a `HadoopDelegationTokenProvider` that is then called by Spark when generating +delegation tokens for an application. Spark makes the DTs available to code by stashing them in the +`UserGroupInformation` credentials, and it's up to the DT provider and the respective client library +to agree on how to use those tokens. + +Once they are created, the semantics of how DTs operate are also service-specific. But, in general, +they try to follow the semantics of Kerberos
svn commit: r31976 - in /dev/spark/3.0.0-SNAPSHOT-2019_01_15_10_51-7296999-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jan 15 19:03:56 2019 New Revision: 31976 Log: Apache Spark 3.0.0-SNAPSHOT-2019_01_15_10_51-7296999 docs [This commit notification would consist of 1775 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Diff for: [GitHub] srowen closed pull request #23447: [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution categories
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index ab0ae55ed357d..67f2c279dca5a 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import scala.concurrent.Future import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerId @@ -83,7 +83,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // "spark.network.timeoutInterval" uses "seconds", while // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" private val timeoutIntervalMs = -sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") +sc.conf.get(config.STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL) private val checkTimeoutIntervalMs = sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 1100868ac1f4e..22bcb8113ce0c 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -532,7 +532,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria } // Validate memory fractions -for (key <- Seq("spark.memory.fraction", "spark.memory.storageFraction")) { +for (key <- Seq(MEMORY_FRACTION.key, MEMORY_STORAGE_FRACTION.key)) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').") @@ -664,7 +664,7 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.yarn.applicationMaster.waitTries", "1.3", // Translate old value to a duration, with 10s wait time per try. translation = s => s"${s.toLong * 10}s")), -"spark.reducer.maxSizeInFlight" -> Seq( +REDUCER_MAX_SIZE_IN_FLIGHT.key -> Seq( AlternateConfig("spark.reducer.maxMbInFlight", "1.4")), "spark.kryoserializer.buffer" -> Seq( AlternateConfig("spark.kryoserializer.buffer.mb", "1.4", @@ -675,9 +675,9 @@ private[spark] object SparkConf extends Logging { AlternateConfig("spark.shuffle.file.buffer.kb", "1.4")), EXECUTOR_LOGS_ROLLING_MAX_SIZE.key -> Seq( AlternateConfig("spark.executor.logs.rolling.size.maxBytes", "1.4")), -"spark.io.compression.snappy.blockSize" -> Seq( +IO_COMPRESSION_SNAPPY_BLOCKSIZE.key -> Seq( AlternateConfig("spark.io.compression.snappy.block.size", "1.4")), -"spark.io.compression.lz4.blockSize" -> Seq( +IO_COMPRESSION_LZ4_BLOCKSIZE.key -> Seq( AlternateConfig("spark.io.compression.lz4.block.size", "1.4")), "spark.rpc.numRetries" -> Seq( AlternateConfig("spark.akka.num.retries", "1.4")), diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 5ed5070558af7..14ea289e5f908 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -39,6 +39,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.input.PortableDataStream import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.BUFFER_SIZE import org.apache.spark.network.util.JavaUtils import org.apache.spark.rdd.RDD import org.apache.spark.security.SocketAuthHelper @@ -604,7 +605,7 @@ private[spark] class PythonAccumulatorV2( Utils.checkHost(serverHost) - val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536) + val bufferSize = SparkEnv.get.conf.get(BUFFER_SIZE) /** * We try to reuse a single Socket to transfer accumulator updates, as they are all added diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 5168e9330965d..b7f14e062b437 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.EXECUTOR_CORES +import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES} import org.apache.spark.internal.config.Python._ import org.apache.spark.security.SocketAuthHelper import
[spark] branch master updated: [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution categories
This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7296999 [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution categories 7296999 is described below commit 7296999c4751cfddcca5b77e3348354cff65d069 Author: Pralabh Kumar AuthorDate: Tue Jan 15 12:50:07 2019 -0600 [SPARK-26462][CORE] Use ConfigEntry for hardcoded configs for execution categories ## What changes were proposed in this pull request? Make the following hardcoded configs to use ConfigEntry. spark.memory spark.storage spark.io spark.buffer spark.rdd spark.locality spark.broadcast spark.reducer ## How was this patch tested? Existing tests. Closes #23447 from pralabhkumar/execution_categories. Authored-by: Pralabh Kumar Signed-off-by: Sean Owen --- .../scala/org/apache/spark/HeartbeatReceiver.scala | 4 +- .../main/scala/org/apache/spark/SparkConf.scala| 8 +- .../org/apache/spark/api/python/PythonRDD.scala| 3 +- .../org/apache/spark/api/python/PythonRunner.scala | 4 +- .../scala/org/apache/spark/api/r/RRunner.scala | 4 +- .../apache/spark/broadcast/TorrentBroadcast.scala | 8 +- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 3 +- .../scala/org/apache/spark/executor/Executor.scala | 2 +- .../org/apache/spark/internal/config/package.scala | 191 - .../org/apache/spark/io/CompressionCodec.scala | 13 +- .../org/apache/spark/memory/MemoryManager.scala| 2 +- .../apache/spark/memory/UnifiedMemoryManager.scala | 4 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 3 +- .../apache/spark/rdd/ReliableCheckpointRDD.scala | 10 +- .../main/scala/org/apache/spark/rdd/UnionRDD.scala | 3 +- .../apache/spark/scheduler/TaskSetManager.scala| 13 +- .../spark/serializer/SerializerManager.scala | 5 +- .../spark/shuffle/BlockStoreShuffleReader.scala| 4 +- .../org/apache/spark/storage/BlockManager.scala| 7 +- .../spark/storage/BlockManagerMasterEndpoint.scala | 6 +- .../scala/org/apache/spark/storage/DiskStore.scala | 2 +- .../org/apache/spark/storage/TopologyMapper.scala | 4 +- .../apache/spark/storage/memory/MemoryStore.scala | 4 +- .../scala/org/apache/spark/DistributedSuite.scala | 8 +- .../scala/org/apache/spark/SparkConfSuite.scala| 4 +- .../apache/spark/broadcast/BroadcastSuite.scala| 7 +- .../spark/memory/UnifiedMemoryManagerSuite.scala | 20 ++- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 5 +- .../scheduler/BlacklistIntegrationSuite.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 2 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 4 +- .../spark/scheduler/TaskSetManagerSuite.scala | 4 +- .../shuffle/sort/ShuffleExternalSorterSuite.scala | 4 +- .../storage/BlockManagerReplicationSuite.scala | 17 +- .../apache/spark/storage/BlockManagerSuite.scala | 18 +- .../org/apache/spark/storage/DiskStoreSuite.scala | 2 +- .../apache/spark/storage/MemoryStoreSuite.scala| 3 +- .../apache/spark/storage/TopologyMapperSuite.scala | 3 +- .../collection/ExternalAppendOnlyMapSuite.scala| 2 +- 39 files changed, 309 insertions(+), 103 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index ab0ae55..67f2c27 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -23,7 +23,7 @@ import scala.collection.mutable import scala.concurrent.Future import org.apache.spark.executor.ExecutorMetrics -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{config, Logging} import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerId @@ -83,7 +83,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) // "spark.network.timeoutInterval" uses "seconds", while // "spark.storage.blockManagerTimeoutIntervalMs" uses "milliseconds" private val timeoutIntervalMs = -sc.conf.getTimeAsMs("spark.storage.blockManagerTimeoutIntervalMs", "60s") +sc.conf.get(config.STORAGE_BLOCKMANAGER_TIMEOUTINTERVAL) private val checkTimeoutIntervalMs = sc.conf.getTimeAsSeconds("spark.network.timeoutInterval", s"${timeoutIntervalMs}ms") * 1000 diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 1100868..22bcb81 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@
[spark] Diff for: [GitHub] asfgit closed pull request #23511: [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user
diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala index aec0f72feb3c1..f3638533e1b7d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala @@ -17,12 +17,13 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} import java.text.SimpleDateFormat import scala.util.control.NonFatal import org.apache.hadoop.io.Text +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.kafka.clients.CommonClientConfigs @@ -33,6 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging { } private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +checkProxyUser() + val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) val createDelegationTokenOptions = new CreateDelegationTokenOptions() val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) @@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging { ), token.tokenInfo.expiryTimestamp) } + private[security] def checkProxyUser(): Unit = { +val currentUser = UserGroupInformation.getCurrentUser() +// Obtaining delegation token for proxy user is planned but not yet implemented +// See https://issues.apache.org/jira/browse/KAFKA-6945 +require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining delegation token for proxy " + + "user is not yet supported.") + } + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { val adminClientProperties = new ju.Properties diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala index 18aa537b3a51d..daa7e544cc9c6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} +import java.security.PrivilegedExceptionAction import javax.security.auth.login.{AppConfigurationEntry, Configuration} +import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} @@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { Configuration.setConfiguration(null) } + test("checkProxyUser with proxy current user should throw exception") { +val realUser = UserGroupInformation.createUserForTesting("realUser", Array()) +UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs( + new PrivilegedExceptionAction[Unit]() { +override def run(): Unit = { + val thrown = intercept[IllegalArgumentException] { +KafkaTokenUtil.checkProxyUser() + } + assert(thrown.getMessage contains +"Obtaining delegation token for proxy user is not yet supported.") +} + } +) + } + test("createAdminClientProperties without bootstrap servers should throw exception") { val thrown = intercept[IllegalArgumentException] { KafkaTokenUtil.createAdminClientProperties(sparkConf) With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] branch master updated: [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user
This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5ca45e8 [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user 5ca45e8 is described below commit 5ca45e8a3db75907e8699e7dff63accbff5bfae5 Author: Gabor Somogyi AuthorDate: Tue Jan 15 10:00:01 2019 -0800 [SPARK-26592][SS] Throw exception when kafka delegation token tried to obtain with proxy user ## What changes were proposed in this pull request? Kafka is not yet support to obtain delegation token with proxy user. It has to be turned off until https://issues.apache.org/jira/browse/KAFKA-6945 implemented. In this PR an exception will be thrown when this situation happens. ## How was this patch tested? Additional unit test. Closes #23511 from gaborgsomogyi/SPARK-26592. Authored-by: Gabor Somogyi Signed-off-by: Marcelo Vanzin --- .../apache/spark/deploy/security/KafkaTokenUtil.scala | 14 +- .../spark/deploy/security/KafkaTokenUtilSuite.scala | 19 ++- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala index aec0f72..f363853 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/KafkaTokenUtil.scala @@ -17,12 +17,13 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} import java.text.SimpleDateFormat import scala.util.control.NonFatal import org.apache.hadoop.io.Text +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier import org.apache.kafka.clients.CommonClientConfigs @@ -33,6 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, S import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -45,6 +47,8 @@ private[spark] object KafkaTokenUtil extends Logging { } private[security] def obtainToken(sparkConf: SparkConf): (Token[_ <: TokenIdentifier], Long) = { +checkProxyUser() + val adminClient = AdminClient.create(createAdminClientProperties(sparkConf)) val createDelegationTokenOptions = new CreateDelegationTokenOptions() val createResult = adminClient.createDelegationToken(createDelegationTokenOptions) @@ -59,6 +63,14 @@ private[spark] object KafkaTokenUtil extends Logging { ), token.tokenInfo.expiryTimestamp) } + private[security] def checkProxyUser(): Unit = { +val currentUser = UserGroupInformation.getCurrentUser() +// Obtaining delegation token for proxy user is planned but not yet implemented +// See https://issues.apache.org/jira/browse/KAFKA-6945 +require(!SparkHadoopUtil.get.isProxyUser(currentUser), "Obtaining delegation token for proxy " + + "user is not yet supported.") + } + private[security] def createAdminClientProperties(sparkConf: SparkConf): ju.Properties = { val adminClientProperties = new ju.Properties diff --git a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala index 18aa537..daa7e54 100644 --- a/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/security/KafkaTokenUtilSuite.scala @@ -17,9 +17,11 @@ package org.apache.spark.deploy.security -import java.{ util => ju } +import java.{util => ju} +import java.security.PrivilegedExceptionAction import javax.security.auth.login.{AppConfigurationEntry, Configuration} +import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} @@ -78,6 +80,21 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { Configuration.setConfiguration(null) } + test("checkProxyUser with proxy current user should throw exception") { +val realUser = UserGroupInformation.createUserForTesting("realUser", Array()) +UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs( + new PrivilegedExceptionAction[Unit]() { +override def run(): Unit = { +
[GitHub] dongjoon-hyun commented on issue #171: Remove old releases from download page. Remove unused 'stable' flag and old info
dongjoon-hyun commented on issue #171: Remove old releases from download page. Remove unused 'stable' flag and old info URL: https://github.com/apache/spark-website/pull/171#issuecomment-454429410 Oh, I didn't see this PR. Could you rebase please? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] dongjoon-hyun merged pull request #170: Update 2.3.1 to use archive
dongjoon-hyun merged pull request #170: Update 2.3.1 to use archive URL: https://github.com/apache/spark-website/pull/170 As this is a foreign pull request (from a fork), the diff has been sent to your commit mailing list, commits@spark.apache.org This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-website] Diff for: [GitHub] dongjoon-hyun merged pull request #170: Update 2.3.1 to use archive
diff --git a/js/downloads.js b/js/downloads.js index 560896909..1d4c1e77f 100644 --- a/js/downloads.js +++ b/js/downloads.js @@ -34,7 +34,7 @@ var packagesV9 = [hadoop2p7, hadoop2p6, hadoopFree, scala2p12_hadoopFree, source addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); -addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, true); +addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, false); addRelease("2.3.0", new Date("02/28/2018"), packagesV8, true, false); addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true, true); addRelease("2.2.2", new Date("07/02/2018"), packagesV8, true, false); diff --git a/site/js/downloads.js b/site/js/downloads.js index 560896909..1d4c1e77f 100644 --- a/site/js/downloads.js +++ b/site/js/downloads.js @@ -34,7 +34,7 @@ var packagesV9 = [hadoop2p7, hadoop2p6, hadoopFree, scala2p12_hadoopFree, source addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); -addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, true); +addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, false); addRelease("2.3.0", new Date("02/28/2018"), packagesV8, true, false); addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true, true); addRelease("2.2.2", new Date("07/02/2018"), packagesV8, true, false); With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark-website] branch asf-site updated: Make 2.3.1 to use archive (#170)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/spark-website.git The following commit(s) were added to refs/heads/asf-site by this push: new 88472e8 Make 2.3.1 to use archive (#170) 88472e8 is described below commit 88472e8cc285fbc8dac39bb5357bf3679177cd60 Author: Dongjoon Hyun AuthorDate: Tue Jan 15 08:18:13 2019 -0700 Make 2.3.1 to use archive (#170) --- js/downloads.js | 2 +- site/js/downloads.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/js/downloads.js b/js/downloads.js index 5608969..1d4c1e7 100644 --- a/js/downloads.js +++ b/js/downloads.js @@ -34,7 +34,7 @@ var packagesV9 = [hadoop2p7, hadoop2p6, hadoopFree, scala2p12_hadoopFree, source addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); -addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, true); +addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, false); addRelease("2.3.0", new Date("02/28/2018"), packagesV8, true, false); addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true, true); addRelease("2.2.2", new Date("07/02/2018"), packagesV8, true, false); diff --git a/site/js/downloads.js b/site/js/downloads.js index 5608969..1d4c1e7 100644 --- a/site/js/downloads.js +++ b/site/js/downloads.js @@ -34,7 +34,7 @@ var packagesV9 = [hadoop2p7, hadoop2p6, hadoopFree, scala2p12_hadoopFree, source addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); -addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, true); +addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, false); addRelease("2.3.0", new Date("02/28/2018"), packagesV8, true, false); addRelease("2.2.3", new Date("01/11/2019"), packagesV8, true, true); addRelease("2.2.2", new Date("07/02/2018"), packagesV8, true, false); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] dongjoon-hyun commented on issue #170: Update 2.3.1 to use archive
dongjoon-hyun commented on issue #170: Update 2.3.1 to use archive URL: https://github.com/apache/spark-website/pull/170#issuecomment-454428645 Merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] dongjoon-hyun commented on issue #170: Update 2.3.1 to use archive
dongjoon-hyun commented on issue #170: Update 2.3.1 to use archive URL: https://github.com/apache/spark-website/pull/170#issuecomment-454428520 Thank you, @HyukjinKwon . This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31973 - in /release/spark: spark-2.1.3/ spark-2.2.2/ spark-2.3.1/
Author: srowen Date: Tue Jan 15 14:44:15 2019 New Revision: 31973 Log: Remove old Spark releases from dist Removed: release/spark/spark-2.1.3/ release/spark/spark-2.2.2/ release/spark/spark-2.3.1/ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31971 - in /dev/spark: 2.4.0-2018_09_15_15_07-1220ab8-bin/ 2.4.0-2018_09_16_00_38-1220ab8-docs/ 2.4.1-SNAPSHOT-2019_01_14_18_37-743dedb-docs/ 3.0.0-SNAPSHOT-2019_01_13_13_55-c01152d-docs/
Author: srowen Date: Tue Jan 15 14:42:56 2019 New Revision: 31971 Log: Remove more old Spark doc snapshots from dev Removed: dev/spark/2.4.0-2018_09_15_15_07-1220ab8-bin/ dev/spark/2.4.0-2018_09_16_00_38-1220ab8-docs/ dev/spark/2.4.1-SNAPSHOT-2019_01_14_18_37-743dedb-docs/ dev/spark/3.0.0-SNAPSHOT-2019_01_13_13_55-c01152d-docs/ dev/spark/3.0.0-SNAPSHOT-2019_01_13_17_59-3f80071-docs/ dev/spark/3.0.0-SNAPSHOT-2019_01_14_06_38-115fecf-docs/ dev/spark/3.0.0-SNAPSHOT-2019_01_14_16_23-bafc7ac-docs/ dev/spark/3.0.0-SNAPSHOT-2019_01_14_20_51-abc937b-docs/ dev/spark/3.0.0-SNAPSHOT-2019_01_15_01_24-a77505d-docs/ dev/spark/spark-2.0.0-preview/ dev/spark/spark-2.0.1/ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] srowen opened a new pull request #171: Remove old releases from download page. Remove unused 'stable' flag and old info
srowen opened a new pull request #171: Remove old releases from download page. Remove unused 'stable' flag and old info URL: https://github.com/apache/spark-website/pull/171 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Diff for: [GitHub] dongjoon-hyun closed pull request #23291: [SPARK-26203][SQL][TEST] Benchmark performance of In and InSet expressions
diff --git a/sql/core/benchmarks/InExpressionBenchmark-results.txt b/sql/core/benchmarks/InExpressionBenchmark-results.txt new file mode 100644 index 0..d2adbded66149 --- /dev/null +++ b/sql/core/benchmarks/InExpressionBenchmark-results.txt @@ -0,0 +1,551 @@ + +In Expression Benchmark + + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +5 bytes: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 101 / 138 98.7 10.1 1.0X +InSet expression 125 / 136 79.7 12.5 0.8X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +10 bytes:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 101 / 111 99.3 10.1 1.0X +InSet expression 126 / 133 79.6 12.6 0.8X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +25 bytes:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 176 / 183 56.9 17.6 1.0X +InSet expression 174 / 184 57.4 17.4 1.0X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +50 bytes:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 299 / 312 33.5 29.9 1.0X +InSet expression 243 / 246 41.2 24.3 1.2X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +100 bytes: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 512 / 518 19.5 51.2 1.0X +InSet expression 388 / 400 25.8 38.8 1.3X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +200 bytes: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 854 / 866 11.7 85.4 1.0X +InSet expression 686 / 694 14.6 68.6 1.2X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +5 shorts:Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 59 / 62169.6 5.9 1.0X +InSet expression 163 / 168 61.3 16.3 0.4X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +10 shorts: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 76 / 78132.0 7.6 1.0X +InSet expression 182 / 186 54.9 18.2 0.4X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +25 shorts: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 126 / 128 79.4
[spark] branch master updated: [SPARK-26203][SQL][TEST] Benchmark performance of In and InSet expressions
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b45ff02 [SPARK-26203][SQL][TEST] Benchmark performance of In and InSet expressions b45ff02 is described below commit b45ff02e77da013c878574e70d14faf09e1aed39 Author: Anton Okolnychyi AuthorDate: Tue Jan 15 07:25:50 2019 -0700 [SPARK-26203][SQL][TEST] Benchmark performance of In and InSet expressions ## What changes were proposed in this pull request? This PR contains benchmarks for `In` and `InSet` expressions. They cover literals of different data types and will help us to decide where to integrate the switch-based logic for bytes/shorts/ints. As discussed in [PR-23171](https://github.com/apache/spark/pull/23171), one potential approach is to convert `In` to `InSet` if all elements are literals independently of data types and the number of elements. According to the results of this PR, we might want to keep the threshold for the number of elements. The if-else approach approach might be faster for some data types on a small number of elements (structs? arrays? small decimals?). ### byte / short / int / long Unless the number of items is really big, `InSet` is slower than `In` because of autoboxing . Interestingly, `In` scales worse on bytes/shorts than on ints/longs. For example, `InSet` starts to match the performance on around 50 bytes/shorts while this does not happen on the same number of ints/longs. This is a bit strange as shorts/bytes (e.g., `(byte) 1`, `(short) 2`) are represented as ints in the bytecode. ### float / double Use cases on floats/doubles also suffer from autoboxing. Therefore, `In` outperforms `InSet` on 10 elements. Similarly to shorts/bytes, `In` scales worse on floats/doubles than on ints/longs because the equality condition is more complicated (e.g., `java.lang.Float.isNaN(filter_valueArg_0) && java.lang.Float.isNaN(9.0F)) || filter_valueArg_0 == 9.0F`). ### decimal The reason why we have separate benchmarks for small and large decimals is that Spark might use longs to represent decimals in some cases. If this optimization happens, then `equals` will be nothing else as comparing longs. If this does not happen, Spark will create an instance of `scala.BigDecimal` and use it for comparisons. The latter is more expensive. `Decimal$hashCode` will always use `scala.BigDecimal$hashCode` even if the number is small enough to fit into a long variable. As a consequence, we see that use cases on small decimals are faster with `In` as they are using long comparisons under the hood. Large decimal values are always faster with `InSet`. ### string `UTF8String$equals` is not cheap. Therefore, `In` does not really outperform `InSet` as in previous use cases. ### timestamp / date Under the hood, timestamp/date values will be represented as long/int values. So, `In` allows us to avoid autoboxing. ### array Arrays are working as expected. `In` is faster on 5 elements while `InSet` is faster on 15 elements. The benchmarks are using `UnsafeArrayData`. ### struct `InSet` is always faster than `In` for structs. These benchmarks use `GenericInternalRow`. Closes #23291 from aokolnychyi/spark-26203. Lead-authored-by: Anton Okolnychyi Co-authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../benchmarks/InExpressionBenchmark-results.txt | 551 + .../benchmark/InExpressionBenchmark.scala | 214 2 files changed, 765 insertions(+) diff --git a/sql/core/benchmarks/InExpressionBenchmark-results.txt b/sql/core/benchmarks/InExpressionBenchmark-results.txt new file mode 100644 index 000..d2adbde --- /dev/null +++ b/sql/core/benchmarks/InExpressionBenchmark-results.txt @@ -0,0 +1,551 @@ + +In Expression Benchmark + + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +5 bytes: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative + +In expression 101 / 138 98.7 10.1 1.0X +InSet expression 125 / 136 79.7 12.5 0.8X + +OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64 +Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz +10 bytes:
[GitHub] srowen commented on a change in pull request #169: Add Apache Spark 2.2.3 release news and update links
srowen commented on a change in pull request #169: Add Apache Spark 2.2.3 release news and update links URL: https://github.com/apache/spark-website/pull/169#discussion_r247895304 ## File path: js/downloads.js ## @@ -36,7 +36,8 @@ addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, true); Review comment: That's right, and I was going to update that and remove the older releases from the mirrors just now -- I'll take care of it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[spark] Diff for: [GitHub] liupc closed pull request #23537: [SPARK-26614]Fix speculation kill might cause job failure
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index a30a501e5d4a1..f92ea7873f9ce 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -606,17 +606,25 @@ private[spark] class Executor( if (!ShutdownHookManager.inShutdown()) { val (accums, accUpdates) = collectAccumulatorsAndResetStatusOnFailure(taskStartTime) -val serializedTaskEndReason = { +val (state, serializedTaskEndReason) = { try { -ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums)) +if (task.reasonIfKilled.isDefined) { + val killReason = task.reasonIfKilled.getOrElse("unknown reason") + val serializedTK = ser.serialize(TaskKilled(killReason), accUpdates, accums) + (TaskState.KILLED, serializedTK) +} else { + (TaskState.FAILED, +ser.serialize(new ExceptionFailure(t, accUpdates).withAccums(accums))) +} } catch { case _: NotSerializableException => // t is not serializable so just send the stacktrace - ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums)) + (TaskState.FAILED, +ser.serialize(new ExceptionFailure(t, accUpdates, false).withAccums(accums))) } } setTaskFinishedAndClearInterruptStatus() -execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason) +execBackend.statusUpdate(taskId, state, serializedTaskEndReason) } else { logInfo("Not reporting error to driver during JVM shutdown.") } With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] HyukjinKwon commented on issue #170: Update 2.3.1 to use archive
HyukjinKwon commented on issue #170: Update 2.3.1 to use archive URL: https://github.com/apache/spark-website/pull/170#issuecomment-454337315 +1 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r31966 - in /dev/spark/3.0.0-SNAPSHOT-2019_01_15_01_24-a77505d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s
Author: pwendell Date: Tue Jan 15 09:36:22 2019 New Revision: 31966 Log: Apache Spark 3.0.0-SNAPSHOT-2019_01_15_01_24-a77505d docs [This commit notification would consist of 1775 parts, which exceeds the limit of 50 ones, so it was shortened to the summary.] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] dongjoon-hyun commented on issue #170: Update 2.3.1 to use archive
dongjoon-hyun commented on issue #170: Update 2.3.1 to use archive URL: https://github.com/apache/spark-website/pull/170#issuecomment-454301569 cc @felixcheung This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] dongjoon-hyun opened a new pull request #170: Update 2.3.1 to use archive
dongjoon-hyun opened a new pull request #170: Update 2.3.1 to use archive URL: https://github.com/apache/spark-website/pull/170 We need to update 2.3.1 download to use archive and remove it from mirror. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] felixcheung commented on a change in pull request #169: Add Apache Spark 2.2.3 release news and update links
felixcheung commented on a change in pull request #169: Add Apache Spark 2.2.3 release news and update links URL: https://github.com/apache/spark-website/pull/169#discussion_r247791974 ## File path: js/downloads.js ## @@ -36,7 +36,8 @@ addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, true); Review comment: I think it's safer to put `false` on all "not-the-last" release of the minor release tree, ie. 2.3.1, 2.1.2, 2.2.2 etc. it doesn't hurt either way, for now, since 2.3.1 is still on the mirror, but it kinda should be in archive instead This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
[GitHub] felixcheung commented on a change in pull request #169: Add Apache Spark 2.2.3 release news and update links
felixcheung commented on a change in pull request #169: Add Apache Spark 2.2.3 release news and update links URL: https://github.com/apache/spark-website/pull/169#discussion_r247791974 ## File path: js/downloads.js ## @@ -36,7 +36,8 @@ addRelease("2.4.0", new Date("11/02/2018"), packagesV9, true, true); addRelease("2.3.2", new Date("09/24/2018"), packagesV8, true, true); addRelease("2.3.1", new Date("06/08/2018"), packagesV8, true, true); Review comment: I think it's safer to put `false` on all "not-the-last" release of the minor release tree, ie. 2.3.1, 2.1.2 etc. it doesn't hurt either way, for now, since 2.3.1 is still on the mirror, but it kinda should be in archive instead This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org