[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161149468
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
--- End diff --

I see, thanks for pointing out.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20183: [SPARK-22986][Core] Use a cache to avoid instanti...

2018-01-11 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20183#discussion_r161147892
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -206,36 +206,50 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 
   private def readBroadcastBlock(): T = Utils.tryOrIOException {
 TorrentBroadcast.synchronized {
-  setConf(SparkEnv.get.conf)
-  val blockManager = SparkEnv.get.blockManager
-  blockManager.getLocalValues(broadcastId) match {
-case Some(blockResult) =>
-  if (blockResult.data.hasNext) {
-val x = blockResult.data.next().asInstanceOf[T]
-releaseLock(broadcastId)
-x
-  } else {
-throw new SparkException(s"Failed to get locally stored 
broadcast data: $broadcastId")
-  }
-case None =>
-  logInfo("Started reading broadcast variable " + id)
-  val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks()
-  logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
-
-  try {
-val obj = TorrentBroadcast.unBlockifyObject[T](
-  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
-// Store the merged copy in BlockManager so other tasks on 
this executor don't
-// need to re-fetch it.
-val storageLevel = StorageLevel.MEMORY_AND_DISK
-if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
+
+  
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
--- End diff --

Is this `ReferenceMap` thread safe?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20242: [MINOR][BUILD] Fix Java linter errors

2018-01-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20242
  
LGTM, @dongjoon-hyun is the current changes include all the lint issues, or 
you still have further changes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20184: [SPARK-22987][Core] UnsafeExternalSorter cases OOM when ...

2018-01-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20184
  
@liutang123 , can you please tell us how to produce your issue easily?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20236: [SPARK-23044] Error handling for jira assignment

2018-01-11 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20236
  
@squito thanks for the fix. I also don't have PRs to verify the changes, 
but I think catching exception should be enough.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2018-01-10 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
Let me merge to master and branch 2.3. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2018-01-10 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
LGTM. @merlintang please fix the PR title, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2018-01-10 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
@steveloughran @vanzin please help to review again.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19885: [SPARK-22587] Spark job fails if fs.defaultFS and...

2018-01-10 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19885#discussion_r160617532
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -357,6 +357,41 @@ class ClientSuite extends SparkFunSuite with Matchers {
 sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new 
File(jar2.toURI).getName)))
   }
 
+  private val matching = Seq(
+("files URI match test1", "file:///file1", "file:///file2"),
+("files URI match test2", "file:///c:file1", "file://c:file2"),
+("files URI match test3", "file://host/file1", "file://host/file2"),
+("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"),
+("hdfs URI match test", "hdfs:/path1", "hdfs:/path1")
+  )
+
+  matching.foreach {
--- End diff --

nit:

```
matching.foreach { t => 
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19885: [SPARK-22587] Spark job fails if fs.defaultFS and...

2018-01-10 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19885#discussion_r160617569
  
--- Diff: 
resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 ---
@@ -357,6 +357,41 @@ class ClientSuite extends SparkFunSuite with Matchers {
 sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new 
File(jar2.toURI).getName)))
   }
 
+  private val matching = Seq(
+("files URI match test1", "file:///file1", "file:///file2"),
+("files URI match test2", "file:///c:file1", "file://c:file2"),
+("files URI match test3", "file://host/file1", "file://host/file2"),
+("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"),
+("hdfs URI match test", "hdfs:/path1", "hdfs:/path1")
+  )
+
+  matching.foreach {
+t =>
+  test(t._1) {
+assert(Client.compareUri(new URI(t._2), new URI(t._3)),
+  s"No match between ${t._2} and ${t._3}")
+  }
+  }
+
+  private val unmatching = Seq(
+("files URI unmatch test1", "file:///file1", "file://host/file2"),
+("files URI unmatch test2", "file://host/file1", "file:///file2"),
+("files URI unmatch test3", "file://host/file1", "file://host2/file2"),
+("wasb URI unmatch test1", "wasb://bucket1@user", 
"wasb://bucket2@user/"),
+("wasb URI unmatch test2", "wasb://bucket1@user", 
"wasb://bucket1@user2/"),
+("s3 URI unmatch test", "s3a://user@pass:bucket1/", 
"s3a://user2@pass2:bucket1/"),
+("hdfs URI unmatch test1", "hdfs://namenode1/path1", 
"hdfs://namenode1:8080/path2"),
+("hdfs URI unmatch test2", "hdfs://namenode1:8020/path1", 
"hdfs://namenode1:8080/path2")
+  )
+
+  unmatching.foreach {
+t =>
--- End diff --

ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-10 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160612163
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
+val in = new DataInputStream(Channels.newInputStream(channel))
 try {
-  ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
   val nextOffset = in.readLong()
+  val actualPosition = channel.position()
+  val expectedPosition = blockId.reduceId * 8 + 16
+  if (actualPosition != expectedPosition) {
+throw new Exception(s"SPARK-22982: Incorrect channel position 
after index file reads: " +
--- End diff --

I see. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-09 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160351383
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
--- End diff --

I see. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-09 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160347716
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
--- End diff --

Sorry I'm not clear whether the change here is related to "asynchronous 
close()" issue?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20179: [SPARK-22982] Remove unsafe asynchronous close() ...

2018-01-09 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20179#discussion_r160347387
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,11 +196,24 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(new FileInputStream(indexFile))
+// SPARK-22982: if this FileInputStream's position is seeked forward 
by another piece of code
+// which is incorrectly using our file descriptor then this code will 
fetch the wrong offsets
+// (which may cause a reducer to be sent a different reducer's data). 
The explicit position
+// checks added here were a useful debugging aid during SPARK-22982 
and may help prevent this
+// class of issue from re-occurring in the future which is why they 
are left here even though
+// SPARK-22982 is fixed.
+val channel = Files.newByteChannel(indexFile.toPath)
+channel.position(blockId.reduceId * 8)
+val in = new DataInputStream(Channels.newInputStream(channel))
 try {
-  ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
   val nextOffset = in.readLong()
+  val actualPosition = channel.position()
+  val expectedPosition = blockId.reduceId * 8 + 16
+  if (actualPosition != expectedPosition) {
+throw new Exception(s"SPARK-22982: Incorrect channel position 
after index file reads: " +
--- End diff --

Maybe we'd better change to some specific `Exception` type here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2018-01-08 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
Yes, I think so. Based on the current MetricsSystem, it is hard to avoid 
`MetricsRegistry`, whether explicitly or implicitly (unless we 
refactor/abstract this part a lot). Also true if user want to use different 
versions of codahale.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2018-01-08 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
Hi @CodingCat , thanks a lot for your explanation. IIUC, from the code you 
mentioned above, we still need to pass `MetricRegistry` to `Reporter`, 
otherwise how would a reporter report the registered metrics.

In your example, you're using a new `MetricRegistry`, which is empty. I 
don't think this `Reporter` will report anything in `CodeHaleCsvReporter`, 
unless we pass the `MetricRegistry` from `MetricsSystem` to this reporter.

Please correct me if I'm wrong. :)




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20078: [SPARK-22900] [Spark-Streaming] Remove unnecessary restr...

2018-01-03 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20078
  
Originally in Spark dynamic allocation, "spark.executor.instances" and 
dynamic allocation conf cannot be co-existed, if "spark.executor.instances" is 
set, dynamic allocation will not be enabled. But this behavior is changed after 
2.0.

I think here for streaming dynamic allocation, we'd better keep it 
consistent with Spark dynamic allocation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20144: [SPARK-21475][CORE][2nd attempt] Change to use NIO's Fil...

2018-01-03 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20144
  
@zsxwing , would you please take a review, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20144: [SPARK-21475][CORE][2nd attempt] Change to use NI...

2018-01-03 Thread jerryshao
GitHub user jerryshao opened a pull request:

https://github.com/apache/spark/pull/20144

[SPARK-21475][CORE][2nd attempt] Change to use NIO's Files API for external 
shuffle service

## What changes were proposed in this pull request?

This PR is the second attempt of #18684 , NIO's Files API doesn't override 
`skip` method for `InputStream`, so it will bring in performance issue 
(mentioned in #20119). But using `FileInputStream`/`FileOutputStream` will also 
bring in memory issue 
(https://dzone.com/articles/fileinputstream-fileoutputstream-considered-harmful),
 which is severe for long running external shuffle service. So here in this 
proposal, only fixing the external shuffle service related code.

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jerryshao/apache-spark SPARK-21475-v2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20144.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20144


commit 277b801b3ccee95d8f30c02b1ba8693ab39f302a
Author: jerryshao 
Date:   2018-01-04T01:48:55Z

Change to NIO's file API for external shuffle service




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's...

2018-01-03 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20119
  
OK, I will do it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20078: [SPARK-22900] [Spark-Streaming] Remove unnecessary restr...

2018-01-03 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20078
  
I'm not against the fix. My concern is that we've shifted to structured 
streaming, also this feature (streaming dynamic allocation) is seldom 
used/tested, this might not be the only issue regarding to it (as in dynamic 
allocation we updated a lot), do we still need to put effort on it?

Just my concern.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's...

2018-01-03 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20119
  
@zsxwing maybe we only need to fix above two points related to external 
shuffle service, what do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20078: [SPARK-22900] [Spark-Streaming] Remove unnecessary restr...

2018-01-03 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20078
  
Sorry to chime in. This feature (streaming dynamic allocation) is obsolete 
and has bugs, users seldom enabled this feature, does it still worth to fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2018-01-03 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
@CodingCat , IIUC the way you mentioned will also expose Codahale 
`Reporter` to user, can you please explain more? Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...

2018-01-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20119#discussion_r159372233
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleIndexInformation.java
 ---
@@ -39,7 +39,7 @@ public ShuffleIndexInformation(File indexFile) throws 
IOException {
 offsets = buffer.asLongBuffer();
 DataInputStream dis = null;
 try {
-  dis = new DataInputStream(Files.newInputStream(indexFile.toPath()));
+  dis = new DataInputStream(new FileInputStream(indexFile));
--- End diff --

@zsxwing also here I think it will affect external shuffle service.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...

2018-01-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20119#discussion_r159371080
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -198,7 +196,7 @@ private[spark] class IndexShuffleBlockResolver(
 // find out the consolidated file, then the offset within that from 
our index
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
-val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
+val in = new DataInputStream(new FileInputStream(indexFile))
--- End diff --

I see. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...

2018-01-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20119#discussion_r159370876
  
--- Diff: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java
 ---
@@ -165,7 +165,7 @@ private void failRemainingBlocks(String[] 
failedBlockIds, Throwable e) {
 
 DownloadCallback(int chunkIndex) throws IOException {
   this.targetFile = tempFileManager.createTempFile();
-  this.channel = 
Channels.newChannel(Files.newOutputStream(targetFile.toPath()));
+  this.channel = Channels.newChannel(new FileOutputStream(targetFile));
--- End diff --

Here I think we can use `FileChannel.open` instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Us...

2018-01-02 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/20119#discussion_r159364034
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -94,9 +93,9 @@ public ByteBuffer nioByteBuffer() throws IOException {
 
   @Override
   public InputStream createInputStream() throws IOException {
-InputStream is = null;
+FileInputStream is = null;
 try {
-  is = Files.newInputStream(file.toPath());
+  is = new FileInputStream(file);
   ByteStreams.skipFully(is, offset);
   return new LimitedInputStream(is, length);
--- End diff --

I think this two lines might be the place which suffers from `skip` issue, 
can we just only revert this place? @zsxwing @cloud-fan @gatorsmile .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2018-01-02 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
Sorry for late response, I was off last two weeks. Currently I don't have a 
better solution for this, @CodingCat let me think about your suggestion, thanks 
a lot :). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20119: [SPARK-21475][Core]Revert "[SPARK-21475][CORE] Use NIO's...

2018-01-02 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/20119
  
Sorry I haven't checked the details, let me take a look at it. The changes 
I made was trying to fix memory issue for shuffle (especially external shuffle 
service), this issue was occurred in our prod cluster. Let me think if there's 
a way to fix it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2017-12-06 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
I see. Thanks for the explanation @steveloughran . My concern is that 
current changes will affect all the filesystems, but we only saw this issue in 
wasb. So limiting authority comparison to only wasb will be safer.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2017-12-06 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
>User info isn't picked up from the URL, it's taken off your Kerberos 
credentials. If you are running HDFS unkerberized, then UGI takes it from the 
environment variable HADOOP_USER_NAME.

I understand that userInfo is not picked from URL. But here in this PR it 
tries to use authority to compare to filesystems, and authority includes 
userInfo. So based on the code here,  `hdfs://us...@nn1.com:8020` and 
`hdfs://us...@nn1.com:8020` are obviously two filesystems, but is that true 
this two URIs belongs to two actual hdfs clusters? Or they're just two local 
filesystem objects?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-05 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r155125699
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build th

[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2017-12-05 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
I still have a question about it, URIs for HDFS like 
`hdfs://us...@nn1.com:8020` and `hdfs://us...@nn1.com:8020` , do we honor 
userInfo for HDFS filesystems, are they two HDFS clusters, or just two FS cache 
object in local client side? @merlintang did you try this in your local test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-05 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154875878
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
 ---
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.UUID
+
+import com.google.common.primitives.Longs
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.steps._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.util.SystemClock
+
+/**
+ * Constructs the complete list of driver configuration steps to run to 
deploy the Spark driver.
+ */
+private[spark] class DriverConfigurationStepsOrchestrator(
+namespace: String,
+kubernetesAppId: String,
+launchTime: Long,
+mainAppResource: Option[MainAppResource],
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) {
+
+  // The resource name prefix is derived from the application name, making 
it easy to connect the
+  // names of the Kubernetes resources from e.g. kubectl or the Kubernetes 
dashboard to the
+  // application the user submitted. However, we can't use the application 
name in the label, as
+  // label values are considerably restrictive, e.g. must be no longer 
than 63 characters in
+  // length. So we generate a separate identifier for the app ID itself, 
and bookkeeping that
+  // requires finding "all pods for this application" should use the 
kubernetesAppId.
+  private val kubernetesResourceNamePrefix = {
+val uuid = UUID.nameUUIDFromBytes(Longs.toByteArray(launchTime))
+s"$appName-$uuid".toLowerCase.replaceAll("\\.", "-")
+  }
+
+  private val dockerImagePullPolicy = 
submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+  private val jarsDownloadPath = 
submissionSparkConf.get(JARS_DOWNLOAD_LOCATION)
+  private val filesDownloadPath = 
submissionSparkConf.get(FILES_DOWNLOAD_LOCATION)
+
+  def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
+val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
+  submissionSparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
+  s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with 
key " +
+  s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark 
bookkeeping " +
+  "operations.")
+
+val allDriverLabels = driverCustomLabels ++ Map(
+  SPARK_APP_ID_LABEL -> kubernetesAppId,
+  SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
+
+val initialSubmissionStep = new BaseDriverConfigurationStep(
+  kubernetesAppId,
+  kubernetesResourceNamePrefix,
+  allDriverLabels,
+  dockerImagePullPolicy,
+  appName,
+  mainClass,
+  appArgs,
+  submissionSparkConf)
+
+val driverAddressStep = new DriverServiceBootstrapStep(
+  kubernetesResourceNamePrefix,
+  allDriverLabels,
+  submissionSparkConf,
+  new SystemClock)
+
+val kubernetesCredentialsStep = new DriverKubernetesCredentialsStep(
+  submissionSparkConf, kubernetesResourceNamePrefix)
+
+val additionalMainAppJar = if (mainAppResource.nonEmpty) {
+   val mayBeResource = mainAppResource.get match {
+case JavaMainA

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-05 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154870951
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging {
 }
   }
 
+  /**
+   * Check the validity of the given Kubernetes master URL and return the 
resolved URL.
+   */
+  def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
+require(rawMasterURL.startsWith("k8s://"),
+  "Kubernetes master URL must start with k8s://.")
+val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
--- End diff --

Can we change this String representation to `URI` to make the below check 
more robust.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-05 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154874378
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build th

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-05 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154872371
  
--- Diff: docs/running-on-yarn.md ---
@@ -234,18 +234,11 @@ To use a custom metrics.properties for the 
application master and executors, upd
 The amount of off-heap memory (in megabytes) to be allocated per 
executor. This is memory that accounts for things like VM overheads, interned 
strings, other native overheads, etc. This tends to grow with the executor size 
(typically 6-10%).
   
 
-
-  spark.yarn.driver.memoryOverhead
--- End diff --

I think we should make this configuration backward compatible, user should 
still be able to use `spark.yarn.driver.memoryOverhead` with warning log, like 
other deprecated configurations.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-05 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154871648
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2744,6 +2744,25 @@ private[spark] object Utils extends Logging {
 }
   }
 
+  /**
+   * Check the validity of the given Kubernetes master URL and return the 
resolved URL.
+   */
+  def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
+require(rawMasterURL.startsWith("k8s://"),
+  "Kubernetes master URL must start with k8s://.")
+val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
+if (masterWithoutK8sPrefix.startsWith("https://";)) {
+  masterWithoutK8sPrefix
+} else if (masterWithoutK8sPrefix.startsWith("http://";)) {
+  logWarning("Kubernetes master URL uses HTTP instead of HTTPS.")
+  masterWithoutK8sPrefix
+} else {
--- End diff --

Should this be happened only when scheme is not existed?  From the code 
looks like if user misconfigured with different scheme, the code will also be 
executed here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2017-12-04 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
ok to test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2017-12-04 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
Is this assumption based on the implementation of Hadoop `FileSystem`? I 
was thinking that wasb is an exception, for other we still keep the original 
code.

@steveloughran would you please comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2017-12-04 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
@vanzin please help to review, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19885: [SPARK-22587] Spark job fails if fs.defaultFS and...

2017-12-04 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19885#discussion_r154822603
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -1428,6 +1428,12 @@ private object Client extends Logging {
   return false
 }
 
+val srcAuthority = srcUri.getAuthority()
+val detAuthority = dstUri.getAuthority()
+if (srcAuthority != detAuthority || (srcAuthority != null && 
!srcAuthority.equalsIgnoreCase(detAuthority))) {
--- End diff --

I guess this line is too long.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19885: [SPARK-22587] Spark job fails if fs.defaultFS and applic...

2017-12-04 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19885
  
@merlintang would you please add the problem to your PR description, 
currently it is a WASB problem in which userInfo is honored to differentiate 
filesystems. Please add the scenario to the description.

Besides the changes here will also affect all other filesystems like 
HDFS/S3, do they still have same behaviors? What will be happened if 
`hdfs://us...@nn1.com:8020` and `hdfs://us...@nn1.com:8020`, are they two 
filesystems? Would you please clarify?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...

2017-11-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19840
  
I'm a little concerned about such changes, this may be misconfigured to 
introduce the discrepancy between driver python and executor python, at least 
we should honor this configuration "spark.executorEnv.PYSPARK_PYTHON" unless 
the executables of `PYSPARK_PYTHON` sent by driver is not existed. (Just my two 
cents)

ping @zjffdu , any thought?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...

2017-11-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19840
  
Oh, I see. You're running in client mode. So this one `--conf 
spark.yarn.appMasterEnv.PYSPARK_PYTHON=py3.zip/py3/bin/python` is useless.

So I guess the behavior is expected. Because driver will honor 
`PYSPARK_PYTHON` env and ship it to executors. So the cluster will use same 
python executables.

With your above test, `/path/to/python` is different for driver and 
executors, will it bring in issues? Driver uses `PYSPARK_PYTHON` and executors 
uses `spark.executorEnv.PYSPARK_PYTHON` which points to different paths.

Normally I think we don't need to set PYSPARK_PYTHON in executor side.

Please correct me if I'm wrong.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19840: [SPARK-22640][PYSPARK][YARN]switch python exec on execut...

2017-11-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19840
  
I think in YARN we have several different ways to set `PYSPARK_PYTHON`, I 
guess your issue is that which one should take priority?

Can you please:

1. Define a consistent ordering for such envs, which one should take 
priority (spark.yarn.appMasterEnv.XXX or XXX), and document it.
2. Check if it works as expected for 
`spark.yarn.appMasterEnv.PYSPARK_PYTHON` and `spark.executorEnv.PYSPARK_PYTHON`.

I guess for other envs, it will also suffer from this problem, am I right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19856: [SPARK-22664] The logs about "Connected to Zookeeper" in...

2017-11-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19856
  
>I think the log can't reflect the behavior of consumer connection,because 
consumer.create doesn't do any connect,it only construct a 
ZookeeperConsumerConnector instance

That's not true, `ZookeeperConsumerConnector` will also connect to ZK 
during initialization, please see here 
(https://github.com/apache/kafka/blob/c9f930e3fe25e5675e64550c8b396356c5349ca7/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L126).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19856: [SPARK-22664] The logs about "Connected to Zookeeper" in...

2017-11-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19856
  
Actually there's no issue here, IMHO I think your understanding of this log 
is slightly different from the original purpose.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19856: [SPARK-22664] The logs about "Connected to Zookeeper" in...

2017-11-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19856
  
I guess the original purpose of such log is to reflect the behavior of 
consumer connection. It is not super necessary to do such trivial change. Also 
`ReliableKafkaReceiver` is not recommended any more, let's keep as it is.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19631: [SPARK-22372][core, yarn] Make cluster submission use Sp...

2017-11-30 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19631
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19812: [SPARK-22598][CORE] ExecutorAllocationManager does not r...

2017-11-28 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19812
  
Does this failure ". For some reason, all of the 3 executors failed. " 
happened during task running or before task submission? Besides, if you're 
running on yarn, yarn will bring new executors to meet the requirement, also 
`ExecutorAllocationManager` will be notified with executor lost/register. Can 
you please tell us how to reproduce your scenario?




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19834: [SPARK-22585][Core] Path in addJar is not url encoded

2017-11-28 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19834
  
LGTM.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153678912
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM

[GitHub] spark issue #19812: [SPARK-22598][CORE] ExecutorAllocationManager does not r...

2017-11-28 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19812
  
Hi @liutang123 would you mind explaining us the issue you met and how to 
reproduce it? Currently we don't know what actual issue it is and how to 
evaluate your changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408574
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153410482
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
--- End diff --

Did you add support of this configuration "spark.driver.userClassPathFirst" 
and "spark.driver.userClassPathFirst"? Sorry I cannot find it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153407637
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
--- End diff --

This configuration seems like a set of String options, we should check all 
the legal options like this SQL conf:

```scala
  val CATALOG_IMPLEMENTATION = 
buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
```

Beside we should add `checkValue` for the `ConfigEntry` if possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153407820
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
--- End diff --

Here the style should be unified after #19631 is merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408859
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

We should also add check for `validateKillArguments` and 
`validateStatusRequstArguments`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19717: [SPARK-18278] [Submission] Spark on Kubernetes - basic s...

2017-11-27 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19717
  
I think we'd better to honor newly added 
`org.apache.spark.deploy.SparkApplication` to implement k8s client, like #19631 
.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-27 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
@rxin , thanks for your comment. The key motivation of this PR is to expose 
the metrics Sink/Source interface for third-party plugins, so that we don't 
need to maintain every different Sink/Source in spark core, like proposal 
#19775 . I agree with you that exposing Codahale metrics object is not a good 
choice, let me think more on the interface design.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-26 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19788
  
@yucai I'm thinking of the necessity to add this new configuration 
`spark.shuffle.continuousFetch` like you mentioned above. This PR you proposed 
is actually a superset of previous way, it is compatible with original shuffle 
way if length = 1. The configuration here is only used to keep compatible for 
external shuffle service, I think it is not so intuitive and user may confused 
whether this should be enabled or not (since since conf is 
functionality-orientiated). Besides do we need to guarantee forward compatible, 
also is there a transparent way to automatically switch between two shuffles 
without configuration?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11994: [SPARK-14151] Expose metrics Source and Sink inte...

2017-11-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/11994#discussion_r153110760
  
--- Diff: core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala 
---
@@ -195,18 +196,26 @@ private[spark] class MetricsSystem private (
   val classPath = kv._2.getProperty("class")
   if (null != classPath) {
 try {
-  val sink = Utils.classForName(classPath)
-.getConstructor(classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
-.newInstance(kv._2, registry, securityMgr)
+  val sink = Utils.classForName(classPath).getConstructor(
+classOf[Properties], classOf[MetricRegistry], 
classOf[SecurityManager])
+  .newInstance(kv._2, registry, securityMgr)
   if (kv._1 == "servlet") {
 metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
   } else {
 sinks += sink.asInstanceOf[Sink]
   }
 } catch {
-  case e: Exception =>
-logError("Sink class " + classPath + " cannot be instantiated")
-throw e
+  case _: NoSuchMethodException =>
+try {
+  sinks += Utils.classForName(classPath)
--- End diff --

No, not necessary, `MetricsServlet` is a built-in metrics sink which will 
be explicitly added in the above code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-26 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
@felixcheung thanks for your reviewing. I think there's no next step, 
current changes should be enough for user to externalize customized metrics 
source and sink.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-26 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r153089584
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

I get your point, thanks for the explanation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

2017-11-23 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19775
  
Do we have to put this in Spark, is it a necessary part of k8s? I think if 
we pull in that PR(https://github.com/apache/spark/pull/11994), then this can 
be stayed out of Spark as a package. Even without #11994 , I believe users can 
still add their own Metrics source/sink via exposed SparkEnv/MetricsSystem. My 
concern is that this unnecessarily increases the code base of spark core.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19802: [WIP][SPARK-22594][CORE] Handling spark-submit and maste...

2017-11-23 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19802
  
Can you please explain more, and how to reproduce this issue? Spark's RPC 
is not designed for version compatible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891920
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

Also if length is "1", then this will always be Zero.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891792
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -812,10 +812,13 @@ private[spark] object MapOutputTracker extends 
Logging {
 logError(errorMessage)
 throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
   } else {
+var totalSize = 0L
 for (part <- startPartition until endPartition) {
-  splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) 
+=
-((ShuffleBlockId(shuffleId, mapId, part), 
status.getSizeForBlock(part)))
+  totalSize += status.getSizeForBlock(part)
--- End diff --

This can be simplified like: `val totalSize = (startPartition until 
endPartition).map(status.getSizeForXXX).sum`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891172
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
--- End diff --

Not necessary to add this, I guess this is mainly for your debug purpose.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891438
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

I doubt this line is not correct, this seems change the semantics, for 
example if startPartition is 3, endPartition is 8, originally it should be 
(3\*8), now it changes to (4\*8), can you please explain more?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-23 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19788
  
ok to test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-23 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19788
  
Sure, I will do it tomorrow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-22 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19788
  
@yucai would you mind adding more explanations to your PR description?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19631: [SPARK-22372][core, yarn] Make cluster submission use Sp...

2017-11-15 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19631
  
Did another round of review, LGTM overall. @tgravescs do you any comment?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r151320052
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ---
@@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
   if (driverConf.contains("spark.yarn.credentials.file")) {
 logInfo("Will periodically update credentials from: " +
   driverConf.get("spark.yarn.credentials.file"))
-SparkHadoopUtil.get.startCredentialUpdater(driverConf)
+
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
--- End diff --

#19272 is already in, do you plan to update it here in this PR or in a 
separate PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19633: [SPARK-22411][SQL] Disable the heuristic to calcu...

2017-11-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19633#discussion_r151308496
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -424,11 +424,19 @@ case class FileSourceScanExec(
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
 val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
-val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
-val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
-val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+// Ignore bytesPerCore when dynamic allocation is enabled. See 
SPARK-22411
+val maxSplitBytes =
+  if 
(Utils.isDynamicAllocationEnabled(fsRelation.sparkSession.sparkContext.getConf))
 {
+defaultMaxSplitBytes
--- End diff --

>For small data, sometimes users care about the number of output files 
generated as well.

Can you please elaborate more, do you want more output files or less output 
files. if less, I think I already mentioned in the above comment.

Seems you patch already ignored `bytesPerCore` when dynamic allocation is 
enabled, are you suggesting something different? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19643: [SPARK-11421][CORE][PYTHON][R] Added ability for ...

2017-11-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19643#discussion_r151307924
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1838,12 +1852,21 @@ class SparkContext(config: SparkConf) extends 
Logging {
   case _ => path
 }
   }
+
   if (key != null) {
 val timestamp = System.currentTimeMillis
 if (addedJars.putIfAbsent(key, timestamp).isEmpty) {
   logInfo(s"Added JAR $path at $key with timestamp $timestamp")
   postEnvironmentUpdate()
 }
+
+if (addToCurrentClassLoader) {
+  Utils.getContextOrSparkClassLoader match {
+case cl: MutableURLClassLoader => 
cl.addURL(Utils.resolveURI(path).toURL)
--- End diff --

I'm not sure does it support remote jars on HTTPS or Hadoop 
FileSystems?In the executor side, we handle this explicitly by downloading 
jars to local and  add to classpath, but here looks like we don't have such 
logic. I'm not sure how this `URLClassLoader` communicate with Hadoop or Https 
without certificates.

The `addJar` is just adding jars to fileserver, so that executor could 
fetch them from driver and add to classpath. It will not affect driver's 
classpath. If we support adding jars to current driver's classloader, then how 
do we leverage this newly added jars?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC di...

2017-11-15 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19741#discussion_r151305271
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -268,8 +268,13 @@ private[spark] abstract class YarnSchedulerBackend(
 logWarning(reason.toString)
 driverEndpoint.ask[Boolean](r).onFailure {
--- End diff --

I think when you use `send`, all the related things as you mentioned above 
should be changes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19741: [SPARK-14228][CORE][YARN] Lost executor of RPC disassoci...

2017-11-15 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19741
  
From my understanding, the above exception seems no harm to the Spark 
application, just running into some threading corner case during stop, am I 
right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-14 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r151015745
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -412,8 +412,6 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 }
 
-if (master == "yarn" && deployMode == "client") 
System.setProperty("SPARK_YARN_MODE", "true")
--- End diff --

Not sure why this is not required anymore?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-14 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r151017494
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ---
@@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
   if (driverConf.contains("spark.yarn.credentials.file")) {
 logInfo("Will periodically update credentials from: " +
   driverConf.get("spark.yarn.credentials.file"))
-SparkHadoopUtil.get.startCredentialUpdater(driverConf)
+
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
--- End diff --

I see, thanks for explanation, this kind of reflection seems not so elegant.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-14 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r151018454
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
---
@@ -745,15 +739,20 @@ private[spark] class Client(
   // Save the YARN configuration into a separate file that will be 
overlayed on top of the
   // cluster's Hadoop conf.
   confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE))
-  yarnConf.writeXml(confStream)
+  hadoopConf.writeXml(confStream)
   confStream.closeEntry()
 
   // Save Spark configuration to a file in the archive.
   val props = new Properties()
   sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) }
   // Override spark.yarn.key to point to the location in distributed 
cache which will be used
   // by AM.
-  Option(amKeytabFileName).foreach { k => 
props.setProperty(KEYTAB.key, k) }
+  Option(amKeytabFileName).foreach { k =>
+// Do not propagate the app's secret using the config file.
+if (k != SecurityManager.SPARK_AUTH_SECRET_CONF) {
--- End diff --

Is it necessary to add a check here? I'm not sure how this could happen.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-13 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r150751268
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -551,13 +553,10 @@ private[spark] class SecurityManager(
 
 private[spark] object SecurityManager {
 
-  val SPARK_AUTH_CONF: String = "spark.authenticate"
-  val SPARK_AUTH_SECRET_CONF: String = "spark.authenticate.secret"
+  val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
+  val SPARK_AUTH_SECRET_CONF = "spark.authenticate.secret"
--- End diff --

I think we can also make this as a `ConfigEntry`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-13 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r150751761
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -92,6 +92,11 @@ object SparkSubmit extends CommandLineUtils with Logging 
{
 
   private val CLASS_NOT_FOUND_EXIT_STATUS = 101
 
+  // Following constants are visible for testing.
+  private[deploy] val YARN_SUBMIT_CLASS = 
"org.apache.spark.deploy.yarn.YarnClusterApplication"
+  private[deploy] val REST_SUBMIT_CLASS = 
classOf[RestSubmissionClientApp].getName()
+  private[deploy] val STANDALONE_SUBMIT_CLASS = 
classOf[ClientApp].getName()
--- End diff --

Maybe we can rename these variables to `XXX_CLUSTER_SUBMIT_CLASS`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19631: [SPARK-22372][core, yarn] Make cluster submission...

2017-11-13 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19631#discussion_r150752055
  
--- Diff: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 ---
@@ -216,7 +216,9 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
   if (driverConf.contains("spark.yarn.credentials.file")) {
 logInfo("Will periodically update credentials from: " +
   driverConf.get("spark.yarn.credentials.file"))
-SparkHadoopUtil.get.startCredentialUpdater(driverConf)
+
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
--- End diff --

What's the purpose of changing to this way? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19633: [SPARK-22411][SQL] Disable the heuristic to calcu...

2017-11-13 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19633#discussion_r150746876
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -424,11 +424,19 @@ case class FileSourceScanExec(
 val defaultMaxSplitBytes =
   fsRelation.sparkSession.sessionState.conf.filesMaxPartitionBytes
 val openCostInBytes = 
fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes
-val defaultParallelism = 
fsRelation.sparkSession.sparkContext.defaultParallelism
-val totalBytes = selectedPartitions.flatMap(_.files.map(_.getLen + 
openCostInBytes)).sum
-val bytesPerCore = totalBytes / defaultParallelism
 
-val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+// Ignore bytesPerCore when dynamic allocation is enabled. See 
SPARK-22411
+val maxSplitBytes =
+  if 
(Utils.isDynamicAllocationEnabled(fsRelation.sparkSession.sparkContext.getConf))
 {
+defaultMaxSplitBytes
--- End diff --

What if `spark.dynamicAllocation.maxExecutors` is not configured? Seems we 
cannot rely on this configuration, user may not always set it.

My concern is the cost of ramping up new executors, by splitting partitions 
into small ones, Spark will ramp up more executors to execute small tasks, when 
the cost of ramping up new executors is larger than executing tasks, this seems 
not a heuristic solution anymore. Previously because all the executors are 
available, so heuristic solution is valid.

For small data (calculated `bytesPerCore ` < `defaultMaxSplitBytes `, less 
than 128M), I think using the available resources to schedule tasks would be 
enough, since task is not so big. For big data (calculated `bytesPerCore ` > 
`defaultMaxSplitBytes `, larger than 128M), I think 128M might be the proper 
value to issue new executors and tasks. So IMHO seems current solution is 
sufficient for dynamic allocation scenario. Please correct me if I'm wrong.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19711: [SPARK-22471][SQL] SQLListener consumes much memo...

2017-11-13 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19711#discussion_r150712289
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
@@ -113,7 +116,7 @@ class SQLListener(conf: SparkConf) extends 
SparkListener with Logging {
*/
   private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
 
-  private val _stageIdToStageMetrics = mutable.HashMap[Long, 
SQLStageMetrics]()
+  private val _stageIdToStageMetrics = mutable.LinkedHashMap[Long, 
SQLStageMetrics]()
--- End diff --

Java's `LinkedHashMap` can be overridden with an custom implementation of 
`removeEldestEntry`, that will save the codes done below. It is not the user 
who call this `removeEldestEntry`...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...

2017-11-13 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19735
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-13 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
Jenkins, retest this please.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19735: [MINOR][CORE] Using bufferedInputStream for dataDeserial...

2017-11-12 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19735
  
ok to test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #11994: [SPARK-14151] Expose metrics Source and Sink interface

2017-11-12 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/11994
  
Sure, let me update the code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19693: [MINOR][CORE] Improved statistical shuffle write time

2017-11-10 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19693
  
Whether shuffle write time should include the file open/close time is 
debatable, also we don't know  whether the actual open action is lazy or not 
(depends on OS). But one downside of this change is that it makes this metric 
incomparable to the old one, because we changed the semantics now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19711: [SPARK-22471][SQL] SQLListener consumes much memo...

2017-11-10 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19711#discussion_r150172409
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
@@ -113,7 +116,7 @@ class SQLListener(conf: SparkConf) extends 
SparkListener with Logging {
*/
   private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
 
-  private val _stageIdToStageMetrics = mutable.HashMap[Long, 
SQLStageMetrics]()
+  private val _stageIdToStageMetrics = mutable.LinkedHashMap[Long, 
SQLStageMetrics]()
--- End diff --

Maybe we can use Java's `LinkedHashMap` and override `removeEldestEntry` to 
what we want.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-09 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19661#discussion_r150171482
  
--- Diff: 
core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
@@ -108,6 +108,27 @@ class KryoSerializerSuite extends SparkFunSuite with 
SharedSparkContext {
 check(Array(Array("1", "2"), Array("1", "2", "3", "4")))
   }
 
+  test("safely register class for mllib/ml") {
+val conf = new SparkConf(false)
+val ser = new KryoSerializer(conf)
+
+Seq("org.apache.spark.mllib.linalg.Vector",
+  "org.apache.spark.mllib.linalg.DenseVector",
+  "org.apache.spark.mllib.linalg.SparseVector",
+  "org.apache.spark.mllib.linalg.Matrix",
+  "org.apache.spark.mllib.linalg.DenseMatrix",
+  "org.apache.spark.mllib.linalg.SparseMatrix",
+  "org.apache.spark.ml.linalg.Vector",
+  "org.apache.spark.ml.linalg.DenseVector",
+  "org.apache.spark.ml.linalg.SparseVector",
+  "org.apache.spark.ml.linalg.Matrix",
+  "org.apache.spark.ml.linalg.DenseMatrix",
+  "org.apache.spark.ml.linalg.SparseMatrix",
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.OffsetInstance"
+).foreach(!Utils.classIsLoadable(_))
--- End diff --

This UT looks doesn't actually reflect your purpose above, seems this 
always be passed. Also `conf` and `ser` above seems never used here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19649: [SPARK-22405][SQL] Add more ExternalCatalogEvent

2017-11-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19649#discussion_r149845572
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/events.scala 
---
@@ -62,6 +62,16 @@ case class DropDatabasePreEvent(database: String) 
extends DatabaseEvent
 case class DropDatabaseEvent(database: String) extends DatabaseEvent
 
 /**
+ * Event fired before a database is altered.
+ */
+case class AlterDatabasePreEvent(database: String) extends DatabaseEvent
--- End diff --

Sure, I will update the title.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19661: [SPARK-22450][Core][Mllib]safely register class f...

2017-11-08 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19661#discussion_r149619662
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
@@ -178,10 +178,40 @@ class KryoSerializer(conf: SparkConf)
 
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
 kryo.register(classOf[ArrayBuffer[Any]])
 
+// We can't load those class directly in order to avoid unnecessary 
jar dependencies.
+// We load them safely, ignore it if the class not found.
+Seq("org.apache.spark.mllib.linalg.Vector",
+  "org.apache.spark.mllib.linalg.DenseVector",
+  "org.apache.spark.mllib.linalg.SparseVector",
+  "org.apache.spark.mllib.linalg.Matrix",
+  "org.apache.spark.mllib.linalg.DenseMatrix",
+  "org.apache.spark.mllib.linalg.SparseMatrix",
+  "org.apache.spark.ml.linalg.Vector",
+  "org.apache.spark.ml.linalg.DenseVector",
+  "org.apache.spark.ml.linalg.SparseVector",
+  "org.apache.spark.ml.linalg.Matrix",
+  "org.apache.spark.ml.linalg.DenseMatrix",
+  "org.apache.spark.ml.linalg.SparseMatrix",
+  "org.apache.spark.ml.feature.Instance",
+  "org.apache.spark.ml.feature.OffsetInstance"
+).flatMap(safeClassLoader(_)).foreach(kryo.register(_))
--- End diff --

Maybe you can change to 

```scala
Seq(xxx).foreach(c => Try(Utils.classForName(c)).foreach(kryo.register))
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19649: [SPARK-22405][SQL] Add more ExternalCatalogEvent

2017-11-08 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19649
  
One question as mentioned above also, do we need to track partition related 
events? @cloud-fan  @hvanhovell @gatorsmile 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19688: [SPARK-22466][Spark Submit]export SPARK_CONF_DIR while c...

2017-11-08 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19688
  
Please specify the purpose of this change in PR description. If it belongs 
to #19663 , why don't you change it there?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >