[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161149468 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { --- End diff -- I see, thanks for pointing out. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20183#discussion_r161147892 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private def readBroadcastBlock(): T = Utils.tryOrIOException { TorrentBroadcast.synchronized { - setConf(SparkEnv.get.conf) - val blockManager = SparkEnv.get.blockManager - blockManager.getLocalValues(broadcastId) match { -case Some(blockResult) => - if (blockResult.data.hasNext) { -val x = blockResult.data.next().asInstanceOf[T] -releaseLock(broadcastId) -x - } else { -throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId") - } -case None => - logInfo("Started reading broadcast variable " + id) - val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks() - logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - - try { -val obj = TorrentBroadcast.unBlockifyObject[T]( - blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) -// Store the merged copy in BlockManager so other tasks on this executor don't -// need to re-fetch it. -val storageLevel = StorageLevel.MEMORY_AND_DISK -if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { - throw new SparkException(s"Failed to store $broadcastId in BlockManager") + val broadcastCache = SparkEnv.get.broadcastManager.cachedValues + + Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse { --- End diff -- Is this `ReferenceMap` thread safe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20242: [MINOR][BUILD] Fix Java linter errors
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20242 LGTM, @dongjoon-hyun is the current changes include all the lint issues, or you still have further changes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20184: [SPARK-22987][Core] UnsafeExternalSorter cases OOM when ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20184 @liutang123 , can you please tell us how to produce your issue easily? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20236 @squito thanks for the fix. I also don't have PRs to verify the changes, but I think catching exception should be enough. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 Let me merge to master and branch 2.3. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 LGTM. @merlintang please fix the PR title, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 @steveloughran @vanzin please help to review again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19885: [SPARK-22587] Spark job fails if fs.defaultFS and...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19885#discussion_r160617532 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -357,6 +357,41 @@ class ClientSuite extends SparkFunSuite with Matchers { sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) } + private val matching = Seq( +("files URI match test1", "file:///file1", "file:///file2"), +("files URI match test2", "file:///c:file1", "file://c:file2"), +("files URI match test3", "file://host/file1", "file://host/file2"), +("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"), +("hdfs URI match test", "hdfs:/path1", "hdfs:/path1") + ) + + matching.foreach { --- End diff -- nit: ``` matching.foreach { t => ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19885: [SPARK-22587] Spark job fails if fs.defaultFS and...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19885#discussion_r160617569 --- Diff: resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -357,6 +357,41 @@ class ClientSuite extends SparkFunSuite with Matchers { sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new File(jar2.toURI).getName))) } + private val matching = Seq( +("files URI match test1", "file:///file1", "file:///file2"), +("files URI match test2", "file:///c:file1", "file://c:file2"), +("files URI match test3", "file://host/file1", "file://host/file2"), +("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"), +("hdfs URI match test", "hdfs:/path1", "hdfs:/path1") + ) + + matching.foreach { +t => + test(t._1) { +assert(Client.compareUri(new URI(t._2), new URI(t._3)), + s"No match between ${t._2} and ${t._3}") + } + } + + private val unmatching = Seq( +("files URI unmatch test1", "file:///file1", "file://host/file2"), +("files URI unmatch test2", "file://host/file1", "file:///file2"), +("files URI unmatch test3", "file://host/file1", "file://host2/file2"), +("wasb URI unmatch test1", "wasb://bucket1@user", "wasb://bucket2@user/"), +("wasb URI unmatch test2", "wasb://bucket1@user", "wasb://bucket1@user2/"), +("s3 URI unmatch test", "s3a://user@pass:bucket1/", "s3a://user2@pass2:bucket1/"), +("hdfs URI unmatch test1", "hdfs://namenode1/path1", "hdfs://namenode1:8080/path2"), +("hdfs URI unmatch test2", "hdfs://namenode1:8020/path1", "hdfs://namenode1:8080/path2") + ) + + unmatching.foreach { +t => --- End diff -- ditto. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160612163 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) +val in = new DataInputStream(Channels.newInputStream(channel)) try { - ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() + val actualPosition = channel.position() + val expectedPosition = blockId.reduceId * 8 + 16 + if (actualPosition != expectedPosition) { +throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " + --- End diff -- I see. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160351383 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) --- End diff -- I see. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160347716 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) --- End diff -- Sorry I'm not clear whether the change here is related to "asynchronous close()" issue? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20179#discussion_r160347387 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(new FileInputStream(indexFile)) +// SPARK-22982: if this FileInputStream's position is seeked forward by another piece of code +// which is incorrectly using our file descriptor then this code will fetch the wrong offsets +// (which may cause a reducer to be sent a different reducer's data). The explicit position +// checks added here were a useful debugging aid during SPARK-22982 and may help prevent this +// class of issue from re-occurring in the future which is why they are left here even though +// SPARK-22982 is fixed. +val channel = Files.newByteChannel(indexFile.toPath) +channel.position(blockId.reduceId * 8) +val in = new DataInputStream(Channels.newInputStream(channel)) try { - ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() val nextOffset = in.readLong() + val actualPosition = channel.position() + val expectedPosition = blockId.reduceId * 8 + 16 + if (actualPosition != expectedPosition) { +throw new Exception(s"SPARK-22982: Incorrect channel position after index file reads: " + --- End diff -- Maybe we'd better change to some specific `Exception` type here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 Yes, I think so. Based on the current MetricsSystem, it is hard to avoid `MetricsRegistry`, whether explicitly or implicitly (unless we refactor/abstract this part a lot). Also true if user want to use different versions of codahale. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 Hi @CodingCat , thanks a lot for your explanation. IIUC, from the code you mentioned above, we still need to pass `MetricRegistry` to `Reporter`, otherwise how would a reporter report the registered metrics. In your example, you're using a new `MetricRegistry`, which is empty. I don't think this `Reporter` will report anything in `CodeHaleCsvReporter`, unless we pass the `MetricRegistry` from `MetricsSystem` to this reporter. Please correct me if I'm wrong. :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20078: [SPARK-22900] [Spark-Streaming] Remove unnecessary restr...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20078 Originally in Spark dynamic allocation, "spark.executor.instances" and dynamic allocation conf cannot be co-existed, if "spark.executor.instances" is set, dynamic allocation will not be enabled. But this behavior is changed after 2.0. I think here for streaming dynamic allocation, we'd better keep it consistent with Spark dynamic allocation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20144: [SPARK-21475][CORE][2nd attempt] Change to use NIO's Fil...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20144 @zsxwing , would you please take a review, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20144: [SPARK-21475][CORE][2nd attempt] Change to use NI...
GitHub user jerryshao opened a pull request: https://github.com/apache/spark/pull/20144 [SPARK-21475][CORE][2nd attempt] Change to use NIO's Files API for external shuffle service ## What changes were proposed in this pull request? This PR is the second attempt of #18684 , NIO's Files API doesn't override `skip` method for `InputStream`, so it will bring in performance issue (mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also bring in memory issue (https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful), which is severe for long running external shuffle service. So here in this proposal, only fixing the external shuffle service related code. ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jerryshao/apache-spark SPARK-21475-v2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20144.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20144 commit 277b801b3ccee95d8f30c02b1ba8693ab39f302a Author: jerryshao Date: 2018-01-04T01:48:55Z Change to NIO's file API for external shuffle service --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20119 OK, I will do it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20078: [SPARK-22900] [Spark-Streaming] Remove unnecessary restr...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20078 I'm not against the fix. My concern is that we've shifted to structured streaming, also this feature (streaming dynamic allocation) is seldom used/tested, this might not be the only issue regarding to it (as in dynamic allocation we updated a lot), do we still need to put effort on it? Just my concern. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20119 @zsxwing maybe we only need to fix above two points related to external shuffle service, what do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20078: [SPARK-22900] [Spark-Streaming] Remove unnecessary restr...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20078 Sorry to chime in. This feature (streaming dynamic allocation) is obsolete and has bugs, users seldom enabled this feature, does it still worth to fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 @CodingCat , IIUC the way you mentioned will also expose Codahale `Reporter` to user, can you please explain more? Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20119#discussion_r159372233 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java --- @@ -39,7 +39,7 @@ public ShuffleIndexInformation(File indexFile) throws IOException { offsets = buffer.asLongBuffer(); DataInputStream dis = null; try { - dis = new DataInputStream(Files.newInputStream(indexFile.toPath())); + dis = new DataInputStream(new FileInputStream(indexFile)); --- End diff -- @zsxwing also here I think it will affect external shuffle service. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20119#discussion_r159371080 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver( // find out the consolidated file, then the offset within that from our index val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) -val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) +val in = new DataInputStream(new FileInputStream(indexFile)) --- End diff -- I see. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20119#discussion_r159370876 --- Diff: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java --- @@ -165,7 +165,7 @@ private void failRemainingBlocks(String[] failedBlockIds, Throwable e) { DownloadCallback(int chunkIndex) throws IOException { this.targetFile = tempFileManager.createTempFile(); - this.channel = Channels.newChannel(Files.newOutputStream(targetFile.toPath())); + this.channel = Channels.newChannel(new FileOutputStream(targetFile)); --- End diff -- Here I think we can use `FileChannel.open` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/20119#discussion_r159364034 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -94,9 +93,9 @@ public ByteBuffer nioByteBuffer() throws IOException { @Override public InputStream createInputStream() throws IOException { -InputStream is = null; +FileInputStream is = null; try { - is = Files.newInputStream(file.toPath()); + is = new FileInputStream(file); ByteStreams.skipFully(is, offset); return new LimitedInputStream(is, length); --- End diff -- I think this two lines might be the place which suffers from `skip` issue, can we just only revert this place? @zsxwing @cloud-fan @gatorsmile . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 Sorry for late response, I was off last two weeks. Currently I don't have a better solution for this, @CodingCat let me think about your suggestion, thanks a lot :). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/20119 Sorry I haven't checked the details, let me take a look at it. The changes I made was trying to fix memory issue for shuffle (especially external shuffle service), this issue was occurred in our prod cluster. Let me think if there's a way to fix it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 I see. Thanks for the explanation @steveloughran . My concern is that current changes will affect all the filesystems, but we only saw this issue in wasb. So limiting authority comparison to only wasb will be safer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 >User info isn't picked up from the URL, it's taken off your Kerberos credentials. If you are running HDFS unkerberized, then UGI takes it from the environment variable HADOOP_USER_NAME. I understand that userInfo is not picked from URL. But here in this PR it tries to use authority to compare to filesystems, and authority includes userInfo. So based on the code here, `hdfs://us...@nn1.com:8020` and `hdfs://us...@nn1.com:8020` are obviously two filesystems, but is that true this two URIs belongs to two actual hdfs clusters? Or they're just two local filesystem objects? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r155125699 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build th
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 I still have a question about it, URIs for HDFS like `hdfs://us...@nn1.com:8020` and `hdfs://us...@nn1.com:8020` , do we honor userInfo for HDFS filesystems, are they two HDFS clusters, or just two FS cache object in local client side? @merlintang did you try this in your local test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154875878 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala --- @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.UUID + +import com.google.common.primitives.Longs + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.steps._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.SystemClock + +/** + * Constructs the complete list of driver configuration steps to run to deploy the Spark driver. + */ +private[spark] class DriverConfigurationStepsOrchestrator( +namespace: String, +kubernetesAppId: String, +launchTime: Long, +mainAppResource: Option[MainAppResource], +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) { + + // The resource name prefix is derived from the application name, making it easy to connect the + // names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the + // application the user submitted. However, we can't use the application name in the label, as + // label values are considerably restrictive, e.g. must be no longer than 63 characters in + // length. So we generate a separate identifier for the app ID itself, and bookkeeping that + // requires finding "all pods for this application" should use the kubernetesAppId. + private val kubernetesResourceNamePrefix = { +val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime)) +s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-") + } + + private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY) + private val jarsDownloadPath = submissionSparkConf.get(JARS_DOWNLOAD_LOCATION) + private val filesDownloadPath = submissionSparkConf.get(FILES_DOWNLOAD_LOCATION) + + def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = { +val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( + submissionSparkConf, + KUBERNETES_DRIVER_LABEL_PREFIX) +require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + + s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " + + "operations.") +require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " + + s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " + + "operations.") + +val allDriverLabels = driverCustomLabels ++ Map( + SPARK_APP_ID_LABEL -> kubernetesAppId, + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) + +val initialSubmissionStep = new BaseDriverConfigurationStep( + kubernetesAppId, + kubernetesResourceNamePrefix, + allDriverLabels, + dockerImagePullPolicy, + appName, + mainClass, + appArgs, + submissionSparkConf) + +val driverAddressStep = new DriverServiceBootstrapStep( + kubernetesResourceNamePrefix, + allDriverLabels, + submissionSparkConf, + new SystemClock) + +val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep( + submissionSparkConf, kubernetesResourceNamePrefix) + +val additionalMainAppJar = if (mainAppResource.nonEmpty) { + val mayBeResource = mainAppResource.get match { +case JavaMainA
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154870951 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging { } } + /** + * Check the validity of the given Kubernetes master URL and return the resolved URL. + */ + def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { +require(rawMasterURL.startsWith("k8s://"), + "Kubernetes master URL must start with k8s://.") +val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length) --- End diff -- Can we change this String representation to `URI` to make the below check more robust. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154874378 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build th
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154872371 --- Diff: docs/running-on-yarn.md --- @@ -234,18 +234,11 @@ To use a custom metrics.properties for the application master and executors, upd The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). - - spark.yarn.driver.memoryOverhead --- End diff -- I think we should make this configuration backward compatible, user should still be able to use `spark.yarn.driver.memoryOverhead` with warning log, like other deprecated configurations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154871648 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging { } } + /** + * Check the validity of the given Kubernetes master URL and return the resolved URL. + */ + def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { +require(rawMasterURL.startsWith("k8s://"), + "Kubernetes master URL must start with k8s://.") +val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length) +if (masterWithoutK8sPrefix.startsWith("https://";)) { + masterWithoutK8sPrefix +} else if (masterWithoutK8sPrefix.startsWith("http://";)) { + logWarning("Kubernetes master URL uses HTTP instead of HTTPS.") + masterWithoutK8sPrefix +} else { --- End diff -- Should this be happened only when scheme is not existed? From the code looks like if user misconfigured with different scheme, the code will also be executed here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 ok to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 Is this assumption based on the implementation of Hadoop `FileSystem`? I was thinking that wasb is an exception, for other we still keep the original code. @steveloughran would you please comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 @vanzin please help to review, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19885: [SPARK-22587] Spark job fails if fs.defaultFS and...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19885#discussion_r154822603 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -1428,6 +1428,12 @@ private object Client extends Logging { return false } +val srcAuthority = srcUri.getAuthority() +val detAuthority = dstUri.getAuthority() +if (srcAuthority != detAuthority || (srcAuthority != null && !srcAuthority.equalsIgnoreCase(detAuthority))) { --- End diff -- I guess this line is too long. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19885 @merlintang would you please add the problem to your PR description, currently it is a WASB problem in which userInfo is honored to differentiate filesystems. Please add the scenario to the description. Besides the changes here will also affect all other filesystems like HDFS/S3, do they still have same behaviors? What will be happened if `hdfs://us...@nn1.com:8020` and `hdfs://us...@nn1.com:8020`, are they two filesystems? Would you please clarify? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19840 I'm a little concerned about such changes, this may be misconfigured to introduce the discrepancy between driver python and executor python, at least we should honor this configuration "spark.executorEnv.PYSPARK_PYTHON" unless the executables of `PYSPARK_PYTHON` sent by driver is not existed. (Just my two cents) ping @zjffdu , any thought? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19840 Oh, I see. You're running in client mode. So this one `--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=py3.zip/py3/bin/python` is useless. So I guess the behavior is expected. Because driver will honor `PYSPARK_PYTHON` env and ship it to executors. So the cluster will use same python executables. With your above test, `/path/to/python` is different for driver and executors, will it bring in issues? Driver uses `PYSPARK_PYTHON` and executors uses `spark.executorEnv.PYSPARK_PYTHON` which points to different paths. Normally I think we don't need to set PYSPARK_PYTHON in executor side. Please correct me if I'm wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19840 I think in YARN we have several different ways to set `PYSPARK_PYTHON`, I guess your issue is that which one should take priority? Can you please: 1. Define a consistent ordering for such envs, which one should take priority (spark.yarn.appMasterEnv.XXX or XXX), and document it. 2. Check if it works as expected for `spark.yarn.appMasterEnv.PYSPARK_PYTHON` and `spark.executorEnv.PYSPARK_PYTHON`. I guess for other envs, it will also suffer from this problem, am I right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19856: [SPARK-22664] The logs about "Connected to Zookeeper" in...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19856 >I think the log can't reflect the behavior of consumer connection,because consumer.create doesn't do any connect,it only construct a ZookeeperConsumerConnector instance That's not true, `ZookeeperConsumerConnector` will also connect to ZK during initialization, please see here (https://github.com/apache/kafka/blob/c9f930e3fe25e5675e64550c8b396356c5349ca7/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L126). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19856: [SPARK-22664] The logs about "Connected to Zookeeper" in...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19856 Actually there's no issue here, IMHO I think your understanding of this log is slightly different from the original purpose. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19856: [SPARK-22664] The logs about "Connected to Zookeeper" in...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19856 I guess the original purpose of such log is to reflect the behavior of consumer connection. It is not super necessary to do such trivial change. Also `ReliableKafkaReceiver` is not recommended any more, let's keep as it is. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19631: [SPARK-22372][core, yarn] Make cluster submission use Sp...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19631 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19812: [SPARK-22598][CORE] ExecutorAllocationManager does not r...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19812 Does this failure ". For some reason, all of the 3 executors failed. " happened during task running or before task submission? Besides, if you're running on yarn, yarn will bring new executors to meet the requirement, also `ExecutorAllocationManager` will be notified with executor lost/register. Can you please tell us how to reproduce your scenario? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19834: [SPARK-22585][Core] Path in addJar is not url encoded
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19834 LGTM. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r153678912 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.util.concurrent.TimeUnit + +import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = +ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using " + +"spark-submit in cluster mode, this can also be passed to spark-submit via the " + +"--kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val DRIVER_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.driver.docker.image") + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") + .stringConf + .createWithDefault(s"spark-driver:$sparkVersion") + + val EXECUTOR_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag " + +"format.") + .stringConf + .createWithDefault(s"spark-executor:$sparkVersion") + + val DOCKER_IMAGE_PULL_POLICY = +ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") + .doc("Docker image pull policy when pulling any docker image in Kubernetes integration") + .stringConf + .createWithDefault("IfNotPresent") + + + val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = + "spark.kubernetes.authenticate.driver" + val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" + val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + + val KUBERNETES_SERVICE_ACCOUNT_NAME = + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses " + +"this service account when requesting executor pods from the API server. If specific " + +"credentials are given for the driver pod to use, the driver will favor " + +"using those credentials instead.") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_MEMORY_OVERHEAD = +ConfigBuilder("spark.kubernetes.driver.memoryOverhead") + .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the " + +"driver submission server. This is memory that accounts for things like VM
[GitHub] spark issue #19812: [SPARK-22598][CORE] ExecutorAllocationManager does not r...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19812 Hi @liutang123 would you mind explaining us the issue you met and how to reproduce it? Currently we don't know what actual issue it is and how to evaluate your changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r153408574 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.util.concurrent.TimeUnit + +import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = +ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using " + +"spark-submit in cluster mode, this can also be passed to spark-submit via the " + +"--kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val DRIVER_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.driver.docker.image") + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") + .stringConf + .createWithDefault(s"spark-driver:$sparkVersion") + + val EXECUTOR_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag " + +"format.") + .stringConf + .createWithDefault(s"spark-executor:$sparkVersion") + + val DOCKER_IMAGE_PULL_POLICY = +ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") + .doc("Docker image pull policy when pulling any docker image in Kubernetes integration") + .stringConf + .createWithDefault("IfNotPresent") + + + val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = + "spark.kubernetes.authenticate.driver" + val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" + val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + + val KUBERNETES_SERVICE_ACCOUNT_NAME = + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses " + +"this service account when requesting executor pods from the API server. If specific " + +"credentials are given for the driver pod to use, the driver will favor " + +"using those credentials instead.") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_MEMORY_OVERHEAD = +ConfigBuilder("spark.kubernetes.driver.memoryOverhead") + .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the " + +"driver submission server. This is memory that accounts for things like VM
[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r153410482 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf.get(DRIVER_DOCKER_IMAGE) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => + new EnvVarBuilder() +.withName(ENV_SUBMIT_EXTRA_CLASSPATH) +.withValue(classPath) +.build() +} --- End diff -- Did you add support of this configuration "spark.driver.userClassPathFirst" and "spark.driver.userClassPathFirst"? Sorry I cannot find it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r153407637 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.util.concurrent.TimeUnit + +import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = +ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using " + +"spark-submit in cluster mode, this can also be passed to spark-submit via the " + +"--kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val DRIVER_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.driver.docker.image") + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") + .stringConf + .createWithDefault(s"spark-driver:$sparkVersion") + + val EXECUTOR_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag " + +"format.") + .stringConf + .createWithDefault(s"spark-executor:$sparkVersion") + + val DOCKER_IMAGE_PULL_POLICY = +ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") --- End diff -- This configuration seems like a set of String options, we should check all the legal options like this SQL conf: ```scala val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .stringConf .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") ``` Beside we should add `checkValue` for the `ConfigEntry` if possible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r153407820 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +if (isKubernetesCluster) { + childMainClass = "org.apache.spark.deploy.k8s.submit.Client" --- End diff -- Here the style should be unified after #19631 is merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r153408859 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the +| application must be launched. The namespace must already +| exist in the cluster. (Default: default). --- End diff -- We should also add check for `validateKillArguments` and `validateStatusRequstArguments`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19717 I think we'd better to honor newly added `org.apache.spark.deploy.SparkApplication` to implement k8s client, like #19631 . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 @rxin , thanks for your comment. The key motivation of this PR is to expose the metrics Sink/Source interface for third-party plugins, so that we don't need to maintain every different Sink/Source in spark core, like proposal #19775 . I agree with you that exposing Codahale metrics object is not a good choice, let me think more on the interface design. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19788 @yucai I'm thinking of the necessity to add this new configuration `spark.shuffle.continuousFetch` like you mentioned above. This PR you proposed is actually a superset of previous way, it is compatible with original shuffle way if length = 1. The configuration here is only used to keep compatible for external shuffle service, I think it is not so intuitive and user may confused whether this should be enabled or not (since since conf is functionality-orientiated). Besides do we need to guarantee forward compatible, also is there a transparent way to automatically switch between two shuffles without configuration? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/11994#discussion_r153110760 --- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala --- @@ -195,18 +196,26 @@ private[spark] class MetricsSystem private ( val classPath = kv._2.getProperty("class") if (null != classPath) { try { - val sink = Utils.classForName(classPath) -.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) -.newInstance(kv._2, registry, securityMgr) + val sink = Utils.classForName(classPath).getConstructor( +classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) + .newInstance(kv._2, registry, securityMgr) if (kv._1 == "servlet") { metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) } else { sinks += sink.asInstanceOf[Sink] } } catch { - case e: Exception => -logError("Sink class " + classPath + " cannot be instantiated") -throw e + case _: NoSuchMethodException => +try { + sinks += Utils.classForName(classPath) --- End diff -- No, not necessary, `MetricsServlet` is a built-in metrics sink which will be explicitly added in the above code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 @felixcheung thanks for your reviewing. I think there's no next step, current changes should be enough for user to externalize customized metrics source and sink. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r153089584 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- I get your point, thanks for the explanation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19775 Do we have to put this in Spark, is it a necessary part of k8s? I think if we pull in that PR(https://github.com/apache/spark/pull/11994), then this can be stayed out of Spark as a package. Even without #11994 , I believe users can still add their own Metrics source/sink via exposed SparkEnv/MetricsSystem. My concern is that this unnecessarily increases the code base of spark core. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19802: [WIP][SPARK-22594][CORE] Handling spark-submit and maste...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19802 Can you please explain more, and how to reproduce this issue? Spark's RPC is not designed for version compatible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891920 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- Also if length is "1", then this will always be Zero. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891792 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -812,10 +812,13 @@ private[spark] object MapOutputTracker extends Logging { logError(errorMessage) throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) } else { +var totalSize = 0L for (part <- startPartition until endPartition) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += -((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) + totalSize += status.getSizeForBlock(part) --- End diff -- This can be simplified like: `val totalSize = (startPartition until endPartition).map(status.getSizeForXXX).sum`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891172 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") --- End diff -- Not necessary to add this, I guess this is mainly for your debug purpose. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891438 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- I doubt this line is not correct, this seems change the semantics, for example if startPartition is 3, endPartition is 8, originally it should be (3\*8), now it changes to (4\*8), can you please explain more? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19788 ok to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19788 Sure, I will do it tomorrow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19788 @yucai would you mind adding more explanations to your PR description? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19631: [SPARK-22372][core, yarn] Make cluster submission use Sp...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19631 Did another round of review, LGTM overall. @tgravescs do you any comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r151320052 --- Diff: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala --- @@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) -SparkHadoopUtil.get.startCredentialUpdater(driverConf) + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") --- End diff -- #19272 is already in, do you plan to update it here in this PR or in a separate PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19633: [SPARK-22411][SQL] Disable the heuristic to calcu...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19633#discussion_r151308496 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -424,11 +424,19 @@ case class FileSourceScanExec( val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes -val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism -val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum -val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +// Ignore bytesPerCore when dynamic allocation is enabled. See SPARK-22411 +val maxSplitBytes = + if (Utils.isDynamicAllocationEnabled(fsRelation.sparkSession.sparkContext.getConf)) { +defaultMaxSplitBytes --- End diff -- >For small data, sometimes users care about the number of output files generated as well. Can you please elaborate more, do you want more output files or less output files. if less, I think I already mentioned in the above comment. Seems you patch already ignored `bytesPerCore` when dynamic allocation is enabled, are you suggesting something different? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19643: [SPARK-11421][CORE][PYTHON][R] Added ability for ...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19643#discussion_r151307924 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1838,12 +1852,21 @@ class SparkContext(config: SparkConf) extends Logging { case _ => path } } + if (key != null) { val timestamp = System.currentTimeMillis if (addedJars.putIfAbsent(key, timestamp).isEmpty) { logInfo(s"Added JAR $path at $key with timestamp $timestamp") postEnvironmentUpdate() } + +if (addToCurrentClassLoader) { + Utils.getContextOrSparkClassLoader match { +case cl: MutableURLClassLoader => cl.addURL(Utils.resolveURI(path).toURL) --- End diff -- I'm not sure does it support remote jars on HTTPS or Hadoop FileSystemsï¼In the executor side, we handle this explicitly by downloading jars to local and add to classpath, but here looks like we don't have such logic. I'm not sure how this `URLClassLoader` communicate with Hadoop or Https without certificates. The `addJar` is just adding jars to fileserver, so that executor could fetch them from driver and add to classpath. It will not affect driver's classpath. If we support adding jars to current driver's classloader, then how do we leverage this newly added jars? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC di...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19741#discussion_r151305271 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -268,8 +268,13 @@ private[spark] abstract class YarnSchedulerBackend( logWarning(reason.toString) driverEndpoint.ask[Boolean](r).onFailure { --- End diff -- I think when you use `send`, all the related things as you mentioned above should be changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC disassoci...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19741 From my understanding, the above exception seems no harm to the Spark application, just running into some threading corner case during stop, am I right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r151015745 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -412,8 +412,6 @@ class SparkContext(config: SparkConf) extends Logging { } } -if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true") --- End diff -- Not sure why this is not required anymore? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r151017494 --- Diff: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala --- @@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) -SparkHadoopUtil.get.startCredentialUpdater(driverConf) + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") --- End diff -- I see, thanks for explanation, this kind of reflection seems not so elegant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r151018454 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -745,15 +739,20 @@ private[spark] class Client( // Save the YARN configuration into a separate file that will be overlayed on top of the // cluster's Hadoop conf. confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE)) - yarnConf.writeXml(confStream) + hadoopConf.writeXml(confStream) confStream.closeEntry() // Save Spark configuration to a file in the archive. val props = new Properties() sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } // Override spark.yarn.key to point to the location in distributed cache which will be used // by AM. - Option(amKeytabFileName).foreach { k => props.setProperty(KEYTAB.key, k) } + Option(amKeytabFileName).foreach { k => +// Do not propagate the app's secret using the config file. +if (k != SecurityManager.SPARK_AUTH_SECRET_CONF) { --- End diff -- Is it necessary to add a check here? I'm not sure how this could happen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r150751268 --- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala --- @@ -551,13 +553,10 @@ private[spark] class SecurityManager( private[spark] object SecurityManager { - val SPARK_AUTH_CONF: String = "spark.authenticate" - val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret" + val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key + val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret" --- End diff -- I think we can also make this as a `ConfigEntry`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r150751761 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -92,6 +92,11 @@ object SparkSubmit extends CommandLineUtils with Logging { private val CLASS_NOT_FOUND_EXIT_STATUS = 101 + // Following constants are visible for testing. + private[deploy] val YARN_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" + private[deploy] val REST_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() + private[deploy] val STANDALONE_SUBMIT_CLASS = classOf[ClientApp].getName() --- End diff -- Maybe we can rename these variables to `XXX_CLUSTER_SUBMIT_CLASS`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19631#discussion_r150752055 --- Diff: core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala --- @@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { if (driverConf.contains("spark.yarn.credentials.file")) { logInfo("Will periodically update credentials from: " + driverConf.get("spark.yarn.credentials.file")) -SparkHadoopUtil.get.startCredentialUpdater(driverConf) + Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil") --- End diff -- What's the purpose of changing to this way? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19633: [SPARK-22411][SQL] Disable the heuristic to calcu...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19633#discussion_r150746876 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -424,11 +424,19 @@ case class FileSourceScanExec( val defaultMaxSplitBytes = fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes -val defaultParallelism = fsRelation.sparkSession.sparkContext.defaultParallelism -val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + openCostInBytes)).sum -val bytesPerCore = totalBytes / defaultParallelism -val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) +// Ignore bytesPerCore when dynamic allocation is enabled. See SPARK-22411 +val maxSplitBytes = + if (Utils.isDynamicAllocationEnabled(fsRelation.sparkSession.sparkContext.getConf)) { +defaultMaxSplitBytes --- End diff -- What if `spark.dynamicAllocation.maxExecutors` is not configured? Seems we cannot rely on this configuration, user may not always set it. My concern is the cost of ramping up new executors, by splitting partitions into small ones, Spark will ramp up more executors to execute small tasks, when the cost of ramping up new executors is larger than executing tasks, this seems not a heuristic solution anymore. Previously because all the executors are available, so heuristic solution is valid. For small data (calculated `bytesPerCore ` < `defaultMaxSplitBytes `, less than 128M), I think using the available resources to schedule tasks would be enough, since task is not so big. For big data (calculated `bytesPerCore ` > `defaultMaxSplitBytes `, larger than 128M), I think 128M might be the proper value to issue new executors and tasks. So IMHO seems current solution is sufficient for dynamic allocation scenario. Please correct me if I'm wrong. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19711: [SPARK-22471][SQL] SQLListener consumes much memo...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19711#discussion_r150712289 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala --- @@ -113,7 +116,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { */ private val _jobIdToExecutionId = mutable.HashMap[Long, Long]() - private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]() + private val _stageIdToStageMetrics = mutable.LinkedHashMap[Long, SQLStageMetrics]() --- End diff -- Java's `LinkedHashMap` can be overridden with an custom implementation of `removeEldestEntry`, that will save the codes done below. It is not the user who call this `removeEldestEntry`... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19735 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19735 ok to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/11994 Sure, let me update the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19693: [MINOR][CORE] Improved statistical shuffle write time
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19693 Whether shuffle write time should include the file open/close time is debatable, also we don't know whether the actual open action is lazy or not (depends on OS). But one downside of this change is that it makes this metric incomparable to the old one, because we changed the semantics now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19711: [SPARK-22471][SQL] SQLListener consumes much memo...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19711#discussion_r150172409 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala --- @@ -113,7 +116,7 @@ class SQLListener(conf: SparkConf) extends SparkListener with Logging { */ private val _jobIdToExecutionId = mutable.HashMap[Long, Long]() - private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]() + private val _stageIdToStageMetrics = mutable.LinkedHashMap[Long, SQLStageMetrics]() --- End diff -- Maybe we can use Java's `LinkedHashMap` and override `removeEldestEntry` to what we want. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19661#discussion_r150171482 --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala --- @@ -108,6 +108,27 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { check(Array(Array("1", "2"), Array("1", "2", "3", "4"))) } + test("safely register class for mllib/ml") { +val conf = new SparkConf(false) +val ser = new KryoSerializer(conf) + +Seq("org.apache.spark.mllib.linalg.Vector", + "org.apache.spark.mllib.linalg.DenseVector", + "org.apache.spark.mllib.linalg.SparseVector", + "org.apache.spark.mllib.linalg.Matrix", + "org.apache.spark.mllib.linalg.DenseMatrix", + "org.apache.spark.mllib.linalg.SparseMatrix", + "org.apache.spark.ml.linalg.Vector", + "org.apache.spark.ml.linalg.DenseVector", + "org.apache.spark.ml.linalg.SparseVector", + "org.apache.spark.ml.linalg.Matrix", + "org.apache.spark.ml.linalg.DenseMatrix", + "org.apache.spark.ml.linalg.SparseMatrix", + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.OffsetInstance" +).foreach(!Utils.classIsLoadable(_)) --- End diff -- This UT looks doesn't actually reflect your purpose above, seems this always be passed. Also `conf` and `ser` above seems never used here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19649: [SPARK-22405][SQL] Add more ExternalCatalogEvent
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19649#discussion_r149845572 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala --- @@ -62,6 +62,16 @@ case class DropDatabasePreEvent(database: String) extends DatabaseEvent case class DropDatabaseEvent(database: String) extends DatabaseEvent /** + * Event fired before a database is altered. + */ +case class AlterDatabasePreEvent(database: String) extends DatabaseEvent --- End diff -- Sure, I will update the title. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19661#discussion_r149619662 --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala --- @@ -178,10 +178,40 @@ class KryoSerializer(conf: SparkConf) kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$")) kryo.register(classOf[ArrayBuffer[Any]]) +// We can't load those class directly in order to avoid unnecessary jar dependencies. +// We load them safely, ignore it if the class not found. +Seq("org.apache.spark.mllib.linalg.Vector", + "org.apache.spark.mllib.linalg.DenseVector", + "org.apache.spark.mllib.linalg.SparseVector", + "org.apache.spark.mllib.linalg.Matrix", + "org.apache.spark.mllib.linalg.DenseMatrix", + "org.apache.spark.mllib.linalg.SparseMatrix", + "org.apache.spark.ml.linalg.Vector", + "org.apache.spark.ml.linalg.DenseVector", + "org.apache.spark.ml.linalg.SparseVector", + "org.apache.spark.ml.linalg.Matrix", + "org.apache.spark.ml.linalg.DenseMatrix", + "org.apache.spark.ml.linalg.SparseMatrix", + "org.apache.spark.ml.feature.Instance", + "org.apache.spark.ml.feature.OffsetInstance" +).flatMap(safeClassLoader(_)).foreach(kryo.register(_)) --- End diff -- Maybe you can change to ```scala Seq(xxx).foreach(c => Try(Utils.classForName(c)).foreach(kryo.register)) ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19649: [SPARK-22405][SQL] Add more ExternalCatalogEvent
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19649 One question as mentioned above also, do we need to track partition related events? @cloud-fan @hvanhovell @gatorsmile --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19688: [SPARK-22466][Spark Submit]export SPARK_CONF_DIR while c...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19688 Please specify the purpose of this change in PR description. If it belongs to #19663 , why don't you change it there? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org