[GitHub] spark issue #18554: [SPARK-21306][ML] OneVsRest should cache weightCol if ne...

2017-07-06 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/18554
  
I guess we also need to update the python part: 
https://github.com/apache/spark/blob/v2.2.0-rc6/python/pyspark/ml/classification.py#L1563


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122612344
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
--- End diff --

So we exclude slaves which already have executors launched on them, even if 
they match the locality. I guess it's worth adding a comment here explaining 
the motivation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122611850
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -291,6 +300,19 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 return
   }
 
+  if (numExecutors >= executorLimit) {
+logDebug("Executor limit reached. numExecutors: " + numExecutors() 
+
--- End diff --

nit: `numExecutors()` => `numExecutors` since you're using the latter 
everywhere else


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122615540
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
+if (!(remainingHosts contains offerHostname) &&
+  (System.currentTimeMillis() - localityWaitStartTime <= 
localityWait)) {
+  logDebug("Skipping host and waiting for locality. host: " + 
offerHostname)
+  return false
+}
+true
--- End diff --

nit: `true` => `return true` since you're using return elsewhere in this 
method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122612179
  
--- Diff: 
resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 ---
@@ -502,6 +526,23 @@ private[spark] class 
MesosCoarseGrainedSchedulerBackend(
 )
   }
 
+  private def satisfiesLocality(offerHostname: String): Boolean = {
+if (!Utils.isDynamicAllocationEnabled(conf) || 
hostToLocalTaskCount.isEmpty) {
+  return true
+}
+
+// Check the locality information
+val currentHosts = 
slaves.values.filter(_.taskIDs.nonEmpty).map(_.hostname).toSet
+val allDesiredHosts = hostToLocalTaskCount.keys.toSet
+val remainingHosts = allDesiredHosts -- currentHosts
+if (!(remainingHosts contains offerHostname) &&
--- End diff --

I think the spark code style is `a.contains(b)` instead of `a contains b`

See the "Infix Methods" section of http://spark.apache.org/contributing.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #18098: [SPARK-16944][Mesos] Improve data locality when l...

2017-06-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/18098#discussion_r122615907
  
--- Diff: 
resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
 ---
@@ -586,6 +586,44 @@ class MesosCoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite
 assert(backend.isReady)
   }
 
+  test("supports data locality with dynamic allocation") {
+setBackend(Map(
+  "spark.dynamicAllocation.enabled" -> "true",
+  "spark.dynamicAllocation.testing" -> "true",
+  "spark.locality.wait" -> "2s"))
+
+assert(backend.getExecutorIds().isEmpty)
+
+backend.requestTotalExecutors(2, 2, Map("hosts10" -> 1, "hosts11" -> 
1))
+
+// Offer non-local resources, which should be rejected
+var id = 1
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+verifyTaskNotLaunched(driver, s"o$id")
+id = 2
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+verifyTaskNotLaunched(driver, s"o$id")
+
+// Offer local resource
+id = 10
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+var launchedTasks = verifyTaskLaunched(driver, s"o$id")
+assert(s"s$id" == launchedTasks.head.getSlaveId.getValue)
+registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1)
+assert(backend.getExecutorIds().size == 1)
+
+// Wait longer than spark.locality.wait
+Thread.sleep(3000)
+
+// Offer non-local resource, which should be accepted
+id = 1
+offerResources(List(Resources(backend.executorMemory(sc), 1)), id)
+launchedTasks = verifyTaskLaunched(driver, s"o$id")
+assert(s"s$id" == launchedTasks.head.getSlaveId.getValue)
+registerMockExecutor(launchedTasks.head.getTaskId.getValue, s"s$id", 1)
+assert(backend.getExecutorIds().size == 2)
+  }
--- End diff --

I would suggest increasing the test coverage by testing a second round of 
launching by calling `backend.requestTotalExecutors / offerResources/ verify 
..` again. 

We can update the first call to `requestTotalExecutors` to only request one 
executor, and increase to two in the second call. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17750: [SPARK-4899][MESOS] Support for Checkpointing on Coarse ...

2017-06-08 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17750
  
ping @srowen, i think this PR is ready to merge 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17750: [SPARK-4899][MESOS] Support for Checkpointing on ...

2017-05-29 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17750#discussion_r119009317
  
--- Diff: docs/running-on-mesos.md ---
@@ -516,6 +516,16 @@ See the [configuration page](configuration.html) for 
information on Spark config
 Fetcher Cache
   
 
+
+  spark.mesos.checkpoint
+  false
+  
+If set to true, the agents that are running the Spark executors will 
write the framework pid, executor pids and status updates to disk. 
--- End diff --

nit: the *mesos* agents


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17750: [SPARK-4899][MESOS] Support for checkpointing on Coarse ...

2017-05-12 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17750
  
> Do you then think it would be a viable option to enable it by default on 
Coarse grained and have it not used in Fine-grained.

SGTM, especially considering fine-grained mode is already deprecated. 

> Could you expand on this a bit more, I assume we could maintain the state 
of the tasks similar to how driver state is maintained in 
MesosClusterScheduler, and accordingly update state at crucial points, like 
start.

I don't think it's an easy task at all, because the spark driver is not 
designed to recover from crash. 

The state in the MesosClusterScheduler is pretty simple. It's just a REST 
server that accepts requests from clients and launches spark drivers on their 
behalf.  And it just need to persist its mesos framework id, because it need to 
re-register with mesos master with the same framework id if it's restarted. In 
the current implementation MesosClusterScheduler uses zookeeper as the persist 
storage. Aside from that, the MesosClusterScheduler has no other stateful 
information.

The spark driver is totally different, because it contains lots of stateful 
information:  the job/stage/task info, executors info, catalog that holds 
temporary views, to name a few. And all those are kept in the driver's memory 
and would be lost whenever the driver crashes. So it doesn't make sense to set 
`failover_timeout` at all, because spark driver doesn't support fail over.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17750: [SPARK-4899][MESOS] Support for checkpointing on Coarse ...

2017-05-05 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17750
  
> Yes, It is true that there is an associated overhead in both modes, 
that's why the defaults have not been changed. i.e. Default behavior is not to 
checkpoint.

The overhead in fine-grained mode would be much heavier than coarse grained 
mode. For example, each time you run `rdd.collect()` on an 100MB RDD, the mesos 
agent where the executor runs would write 100MB to disk and delete it after the 
driver acknowledge the message. 

In contrast, in the coarse grained mode the executor would send the 100MB 
data to the driver directly without going through mesos agents. The only thing 
that agents write to disk are small task status messages like 
TASK_RUNNING/TASK_KILLED which are typically several KBbytes.


>Setting failover_timeout is necessary as there has to be a max limit for 
how long an agent can be considered to be given back to a failing task.

> And considering that this is being used in the latest version I guess the 
Spark Driver does support it.

https://github.com/apache/spark/blob/v2.1.1/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala#L315

The code in your link is the mesos cluster scheduler, which is **a mesos 
framework that launches spark drivers for you**, not **the mesos scheduler 
inside the spark driver that launches executors.** 

It has `checkpoint` and `failover_timout` set so that the spark drivers 
managed by it won't be killed even if itself is restarted/killed. If you look 
at the code of `MesosClusterScheduler.regsitered` method you can see it calls 
`driver.reconcileTasks()`, which is how it achieves that. In contrast you can't 
the call to reconcileTasks in e.g. `MesosCoarseGrainedSchedulerBackend`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17519: [SPARK-15352][Doc] follow-up: add configuration docs for...

2017-05-05 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17519
  
Thanks @shubhamchopra. Could you please help review & merge this one 
@cloud-fan ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17750: [SPARK-4899][MESOS] Support for checkpointing on Coarse ...

2017-05-03 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17750
  
IMO we should not enable checkpointing in fine-grained mode. Because with 
checkpointing enabled, mesos agents would persist all status updates to disk 
which means great I/O cost because fine-grained mode makes use of mesos status 
updates to send the task results back to the driver.

Also I'm not sure whether it makes sense to set the `failover_timeout` or 
not. The framework timeout is designed for frameworks that can reconcile with 
mesos master of existing tasks when re-connected, but the mesos scheduler in 
spark doesn't implement that yet. Currently when the spark driver disconnects 
with the mesos master, the master would immediately remove the spark driver 
from the frameworks list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17519: [SPARK-15352][Doc] follow-up: add configuration docs for...

2017-05-02 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17519
  
ping @shubhamchopra @cloud-fan


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...

2017-04-04 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17088#discussion_r109680926
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -394,6 +394,68 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assertDataStructuresEmpty()
   }
 
+  test("All shuffle files should on the slave should be cleaned up when 
slave lost") {
+// reset the test context with the right shuffle service config
+afterEach()
+val conf = new SparkConf()
+conf.set("spark.shuffle.service.enabled", "true")
+init(conf)
+runEvent(ExecutorAdded("exec-hostA1", "hostA"))
+runEvent(ExecutorAdded("exec-hostA2", "hostA"))
+runEvent(ExecutorAdded("exec-hostB", "hostB"))
+val firstRDD = new MyRDD(sc, 3, Nil)
+val firstShuffleDep = new ShuffleDependency(firstRDD, new 
HashPartitioner(3))
+val firstShuffleId = firstShuffleDep.shuffleId
+val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
+val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+submit(reduceRdd, Array(0))
+// map stage1 completes successfully, with one task on each executor
+complete(taskSets(0), Seq(
+  (Success,
+MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success,
+MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success, makeMapStatus("hostB", 1))
+))
+// map stage2 completes successfully, with one task on each executor
+complete(taskSets(1), Seq(
+  (Success,
+MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success,
+MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success, makeMapStatus("hostB", 1))
+))
+// make sure our test setup is correct
+val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
+assert(initialMapStatus1.count(_ != null) === 3)
+assert(initialMapStatus1.map{_.location.executorId}.toSet ===
--- End diff --

`map{..}` => `map(..)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...

2017-04-04 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17088#discussion_r109681031
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -394,6 +394,68 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assertDataStructuresEmpty()
   }
 
+  test("All shuffle files should on the slave should be cleaned up when 
slave lost") {
+// reset the test context with the right shuffle service config
+afterEach()
+val conf = new SparkConf()
+conf.set("spark.shuffle.service.enabled", "true")
--- End diff --

Should we add a another test with `spark.shuffle.service.enabled = false`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17088: [SPARK-19753][CORE] Un-register all shuffle outpu...

2017-04-04 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17088#discussion_r109682527
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
@@ -394,6 +394,68 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with Timeou
 assertDataStructuresEmpty()
   }
 
+  test("All shuffle files should on the slave should be cleaned up when 
slave lost") {
+// reset the test context with the right shuffle service config
+afterEach()
+val conf = new SparkConf()
+conf.set("spark.shuffle.service.enabled", "true")
+init(conf)
+runEvent(ExecutorAdded("exec-hostA1", "hostA"))
+runEvent(ExecutorAdded("exec-hostA2", "hostA"))
+runEvent(ExecutorAdded("exec-hostB", "hostB"))
+val firstRDD = new MyRDD(sc, 3, Nil)
+val firstShuffleDep = new ShuffleDependency(firstRDD, new 
HashPartitioner(3))
+val firstShuffleId = firstShuffleDep.shuffleId
+val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
+val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
+val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+submit(reduceRdd, Array(0))
+// map stage1 completes successfully, with one task on each executor
+complete(taskSets(0), Seq(
+  (Success,
+MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success,
+MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success, makeMapStatus("hostB", 1))
+))
+// map stage2 completes successfully, with one task on each executor
+complete(taskSets(1), Seq(
+  (Success,
+MapStatus(BlockManagerId("exec-hostA1", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success,
+MapStatus(BlockManagerId("exec-hostA2", "hostA", 12345), 
Array.fill[Long](1)(2))),
+  (Success, makeMapStatus("hostB", 1))
+))
+// make sure our test setup is correct
+val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
+assert(initialMapStatus1.count(_ != null) === 3)
+assert(initialMapStatus1.map{_.location.executorId}.toSet ===
+  Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
+
+val initialMapStatus2 = mapOutputTracker.mapStatuses.get(0).get
+assert(initialMapStatus2.count(_ != null) === 3)
+assert(initialMapStatus2.map{_.location.executorId}.toSet ===
+  Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
+
+// reduce stage fails with a fetch failure from one host
+complete(taskSets(2), Seq(
+  (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), 
firstShuffleId, 0, 0, "ignored"),
--- End diff --

Seems the `FetchFailed` message should reference `shuffleDep.shuffleId` 
instead of `firstShuffleId`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17519: [SPARK-15352][Doc] follow-up: add configuration docs for...

2017-04-03 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17519
  
@shubhamchopra @cloud-fan PTAL


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17519: [SPARK-15352][Doc] follow-up: add configuration d...

2017-04-03 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/17519

[SPARK-15352][Doc] follow-up: add configuration docs for topology-awa block 
replication

## What changes were proposed in this pull request?

Add configuration docs for topology-awa block replication

## How was this patch tested?

Generating the docs and preview it.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
spark-15352-add-docs-for-topology-aware-block-replication

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17519


commit c86044e803546c31e4da793bde201120c06976ca
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2017-04-03T09:00:22Z

[SPARK-15352][Doc] follow-up: add configuration docs for topology-aware 
block replication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17051: [SPARK-17075][SQL] Follow up: fix file line ending and i...

2017-02-24 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17051
  
jenkins retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17051: [SPARK-17075][SQL] Follow up: fix file line endin...

2017-02-23 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17051#discussion_r10239
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -398,6 +398,27 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
 // For all other SQL types, we compare the entire object directly.
 assert(filteredStats.attributeStats(ar) == expectedColStats)
 }
-  }
 
+// If the filter has a binary operator (including those nested inside
+// AND/OR/NOT), swap the sides of the attribte and the literal, 
reverse the
+// operator, and then check again.
+val rewrittenFilter = filterNode transformExpressionsDown {
+  case op @ EqualTo(ar: AttributeReference, l: Literal) =>
--- End diff --

Emm, we not only switch the side of the attr and the literal, but also 
reversed the operator, e.g. `LessThan` would be changed to `GreaterThan`. So I 
guess we can't use `withNewChildren` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17051: [SPARK-17075][SQL] Follow up: fix file line endin...

2017-02-23 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17051#discussion_r102888024
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -398,6 +398,27 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
 // For all other SQL types, we compare the entire object directly.
 assert(filteredStats.attributeStats(ar) == expectedColStats)
 }
-  }
 
+// If the filter has a binary operator (including those nested inside
+// AND/OR/NOT), swap the sides of the attribte and the literal, 
reverse the
+// operator, and then check again.
+val rewrittenFilter = filterNode transformExpressionsDown {
+  case op @ EqualTo(ar: AttributeReference, l: Literal) =>
--- End diff --

👍 

I tried to find something like this but failed to, so I resorted to the 
current code.  Thanks for the tip!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17051: [SPARK-17075][SQL] Follow up: fix file line endin...

2017-02-23 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17051#discussion_r102887655
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -1,511 +1,509 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
-
-import java.sql.{Date, Timestamp}
-
-import scala.collection.immutable.{HashSet, Map}
-import scala.collection.mutable
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-
-case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
-
-  /**
-   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
-   * for a column after we apply a predicate condition.  For example, 
column c has
-   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
-   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
-   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
-   * after we evaluate the second condition c <= 50.
-   */
-  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
-
-  /**
-   * Returns an option of Statistics for a Filter logical plan node.
-   * For a given compound expression condition, this method computes 
filter selectivity
-   * (or the percentage of rows meeting the filter condition), which
-   * is used to compute row count, size in bytes, and the updated 
statistics after a given
-   * predicated is applied.
-   *
-   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
-   */
-  def estimate: Option[Statistics] = {
-// We first copy child node's statistics and then modify it based on 
filter selectivity.
-val stats: Statistics = plan.child.stats(catalystConf)
-if (stats.rowCount.isEmpty) return None
-
-// save a mutable copy of colStats so that we can later change it 
recursively
-mutableColStats = mutable.Map(stats.attributeStats.map(kv => 
(kv._1.exprId, kv._2)).toSeq: _*)
-
-// estimate selectivity of this filter predicate
-val filterSelectivity: Double = 
calculateFilterSelectivity(plan.condition) match {
-  case Some(percent) => percent
-  // for not-supported condition, set filter selectivity to a 
conservative estimate 100%
-  case None => 1.0
-}
-
-// attributeStats has mapping Attribute-to-ColumnStat.
-// mutableColStats has mapping ExprId-to-ColumnStat.
-// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
-val expridToAttrMap: Map[ExprId, Attribute] =
-  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
-// copy mutableColStats contents to an immutable AttributeMap.
-val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
-  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
-val newColStats = AttributeMap(mutableAttributeStats.toSeq)
-
-val filteredRowCount: BigInt =
-  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
-val filteredSizeInBytes =
-  EstimationUtils.getOutputSize(plan.output, filteredRowCount, 
newColStats)
-
-Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCount),
-  attributeStats = newColStats))
-  }
-
-  /**
-   * Returns a percentage of rows meeting a compound condition in Filter 
node.
-   * A compound condition is 

[GitHub] spark pull request #17051: [SPARK-17075][SQL] Follow up: fix file line endin...

2017-02-23 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/17051#discussion_r102887355
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -1,511 +1,509 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
-
-import java.sql.{Date, Timestamp}
-
-import scala.collection.immutable.{HashSet, Map}
-import scala.collection.mutable
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types._
-
-case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
-
-  /**
-   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
-   * for a column after we apply a predicate condition.  For example, 
column c has
-   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
-   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
-   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
-   * after we evaluate the second condition c <= 50.
-   */
-  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
-
-  /**
-   * Returns an option of Statistics for a Filter logical plan node.
-   * For a given compound expression condition, this method computes 
filter selectivity
-   * (or the percentage of rows meeting the filter condition), which
-   * is used to compute row count, size in bytes, and the updated 
statistics after a given
-   * predicated is applied.
-   *
-   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
-   */
-  def estimate: Option[Statistics] = {
-// We first copy child node's statistics and then modify it based on 
filter selectivity.
-val stats: Statistics = plan.child.stats(catalystConf)
-if (stats.rowCount.isEmpty) return None
-
-// save a mutable copy of colStats so that we can later change it 
recursively
-mutableColStats = mutable.Map(stats.attributeStats.map(kv => 
(kv._1.exprId, kv._2)).toSeq: _*)
-
-// estimate selectivity of this filter predicate
-val filterSelectivity: Double = 
calculateFilterSelectivity(plan.condition) match {
-  case Some(percent) => percent
-  // for not-supported condition, set filter selectivity to a 
conservative estimate 100%
-  case None => 1.0
-}
-
-// attributeStats has mapping Attribute-to-ColumnStat.
-// mutableColStats has mapping ExprId-to-ColumnStat.
-// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
-val expridToAttrMap: Map[ExprId, Attribute] =
-  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
-// copy mutableColStats contents to an immutable AttributeMap.
-val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
-  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
-val newColStats = AttributeMap(mutableAttributeStats.toSeq)
-
-val filteredRowCount: BigInt =
-  EstimationUtils.ceil(BigDecimal(stats.rowCount.get) * 
filterSelectivity)
-val filteredSizeInBytes =
-  EstimationUtils.getOutputSize(plan.output, filteredRowCount, 
newColStats)
-
-Some(stats.copy(sizeInBytes = filteredSizeInBytes, rowCount = 
Some(filteredRowCount),
-  attributeStats = newColStats))
-  }
-
-  /**
-   * Returns a percentage of rows meeting a compound condition in Filter 
node.
-   * A compound condition is 

[GitHub] spark pull request #17051: [SPARK-17075][SQL] Follow up: fix file line endin...

2017-02-23 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/17051

[SPARK-17075][SQL] Follow up: fix file line ending and improve the tests

## What changes were proposed in this pull request?

Fixed the line ending of `FilterEstimation.scala`. Also improved the tests 
to cover more cases.

## How was this patch tested?

Existing unit tests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark fix-cbo-filter-file-encoding

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17051.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17051


commit ee6d9915b26254db176a5aa34c1d59e304e201e0
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2017-02-24T05:59:41Z

[SPARK-17075][SQL] Follow up: fix file line ending and improve the tests.

commit 0f56d0f1003268e4945ec5a427bbcc4bb7061a49
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2017-02-24T05:58:37Z

Use transformExpressionsDown to rewrite the filter.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17051: [SPARK-17075][SQL] Follow up: fix file line ending and i...

2017-02-23 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/17051
  
cc @ron8hu @cloud-fan 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-23 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r102707494
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+
+/**
+ * In this test suite, we test predicates containing the following 
operators:
+ * =, <, <=, >, >=, AND, OR, IS NULL, IS NOT NULL, IN, NOT IN
+ */
+class FilterEstimationSuite extends StatsEstimationTestBase {
+
+  // Suppose our test table has 10 rows and 6 columns.
+  // First column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
+  // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, 
maxLen:4
+  val arInt = AttributeReference("cint", IntegerType)()
+  val childColStatInt = ColumnStat(distinctCount = 10, min = Some(1), max 
= Some(10),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Second column cdate has 10 values from 2017-01-01 through 2017-01-10.
+  val dMin = Date.valueOf("2017-01-01")
+  val dMax = Date.valueOf("2017-01-10")
+  val arDate = AttributeReference("cdate", DateType)()
+  val childColStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), 
max = Some(dMax),
+nullCount = 0, avgLen = 4, maxLen = 4)
+
+  // Third column ctimestamp has 10 values from "2017-01-01 01:00:00" 
through
+  // "2017-01-01 10:00:00" for 10 distinct timestamps (or hours).
+  val tsMin = Timestamp.valueOf("2017-01-01 01:00:00")
+  val tsMax = Timestamp.valueOf("2017-01-01 10:00:00")
+  val arTimestamp = AttributeReference("ctimestamp", TimestampType)()
+  val childColStatTimestamp = ColumnStat(distinctCount = 10, min = 
Some(tsMin), max = Some(tsMax),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  // Fourth column cdecimal has 10 values from 0.20 through 2.00 at 
increment of 0.2.
+  val decMin = new java.math.BigDecimal("0.20")
+  val decMax = new java.math.BigDecimal("2.00")
+  val arDecimal = AttributeReference("cdecimal", DecimalType(12, 2))()
+  val childColStatDecimal = ColumnStat(distinctCount = 10, min = 
Some(decMin), max = Some(decMax),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  // Fifth column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 
6.0, 7.0, 8.0, 9.0, 10.0
+  val arDouble = AttributeReference("cdouble", DoubleType)()
+  val childColStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), 
max = Some(10.0),
+nullCount = 0, avgLen = 8, maxLen = 8)
+
+  // Sixth column cstring has 10 String values:
+  // "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9"
+  val arString = AttributeReference("cstring", StringType)()
+  val childColStatString = ColumnStat(distinctCount = 10, min = None, max 
= None,
+nullCount = 0, avgLen = 2, maxLen = 2)
+
+  test("cint = 2") {
+validateEstimatedStats(
+  arInt,
+  Filter(EqualTo(arInt, Literal(2)), childStatsTestPlan(Seq(arInt))),
+  ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
+nullCount = 0, avgLen = 4, maxLen = 4),
+  Some(1L)
+)
+  }
+
+  test("cint = 0") {
+// This is an out-of-range case since 0 is outside the

[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-19 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r101912584
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,623 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * @param plan a LogicalPlan node that must be an instance of Filter
+ * @param catalystConf a configuration showing if CBO is enabled
+ */
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
--- End diff --

This explanation of why not using `AttributeMap` or `val` is worth being 
included in the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-19 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r101911700
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
--- End diff --

This import seems not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16395: [SPARK-17075][SQL] implemented filter estimation

2017-02-19 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16395#discussion_r101912258
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 ---
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import java.sql.{Date, Timestamp}
+
+import scala.collection.immutable.{HashSet, Map}
+import scala.collection.mutable
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.CatalystConf
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+case class FilterEstimation(plan: Filter, catalystConf: CatalystConf) 
extends Logging {
+
+  /**
+   * We use a mutable colStats because we need to update the corresponding 
ColumnStat
+   * for a column after we apply a predicate condition.  For example, 
column c has
+   * [min, max] value as [0, 100].  In a range condition such as (c > 40 
AND c <= 50),
+   * we need to set the column's [min, max] value to [40, 100] after we 
evaluate the
+   * first condition c > 40.  We need to set the column's [min, max] value 
to [40, 50]
+   * after we evaluate the second condition c <= 50.
+   */
+  private var mutableColStats: mutable.Map[ExprId, ColumnStat] = 
mutable.Map.empty
+
+  /**
+   * Returns an option of Statistics for a Filter logical plan node.
+   * For a given compound expression condition, this method computes 
filter selectivity
+   * (or the percentage of rows meeting the filter condition), which
+   * is used to compute row count, size in bytes, and the updated 
statistics after a given
+   * predicated is applied.
+   *
+   * @return Option[Statistics] When there is no statistics collected, it 
returns None.
+   */
+  def estimate: Option[Statistics] = {
+// We first copy child node's statistics and then modify it based on 
filter selectivity.
+val stats: Statistics = plan.child.stats(catalystConf)
+if (stats.rowCount.isEmpty) return None
+
+// save a mutable copy of colStats so that we can later change it 
recursively
+mutableColStats = mutable.Map(stats.attributeStats.map(kv => 
(kv._1.exprId, kv._2)).toSeq: _*)
+
+// estimate selectivity of this filter predicate
+val filterSelectivity: Double = 
calculateFilterSelectivity(plan.condition) match {
+  case Some(percent) => percent
+  // for not-supported condition, set filter selectivity to a 
conservative estimate 100%
+  case None => 1.0
+}
+
+// attributeStats has mapping Attribute-to-ColumnStat.
+// mutableColStats has mapping ExprId-to-ColumnStat.
+// We use an ExprId-to-Attribute map to facilitate the mapping 
Attribute-to-ColumnStat
+val expridToAttrMap: Map[ExprId, Attribute] =
+  stats.attributeStats.map(kv => (kv._1.exprId, kv._1))
+// copy mutableColStats contents to an immutable AttributeMap.
+val mutableAttributeStats: mutable.Map[Attribute, ColumnStat] =
+  mutableColStats.map(kv => expridToAttrMap(kv._1) -> kv._2)
+val newColStats = AttributeMap(mutableAttributeStats.toSeq)
--- End diff --

Can we extract this section (create the new `AttributeMap` based on 
`mutableColStats`) the to a helper function? It would make the code more clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastr

[GitHub] spark issue #16984: [SPARK-19550] Follow-up: fixed a typo that fails the dev...

2017-02-18 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/16984
  
That's how the shell's "default value" works.  FYI 
http://www.tldp.org/LDP/abs/html/parameter-substitution.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16984: [SPARK-19550] Follow-up: fixed a typo that fails the dev...

2017-02-18 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/16984
  
cc @srowen 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16984: [SPARK-19550] Follow-up: fixed a typo that fails ...

2017-02-18 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/16984

[SPARK-19550] Follow-up: fixed a typo that fails the 
dev/make-distribution.sh script.

## What changes were proposed in this pull request?

Fixed a typo in `dev/make-distribution.sh` script that sets the MAVEN_OPTS 
variable.

## How was this patch tested?

Run `dev/make-distribution.sh` manually.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
fix-spark-make-distribution-after-removing-java7

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16984.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16984


commit 5ca4fb09b1c9fc8dca51c7e5de5a474f91adabbb
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2017-02-18T12:54:20Z

[SPARK-19550] Follow-up: fixed a typo that fails the 
dev/make-distribution.sh script.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16893: [SPARK-19555][SQL] Improve the performance of Str...

2017-02-10 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/16893

[SPARK-19555][SQL] Improve the performance of StringUtils.escapeLikeRegex 
method

## What changes were proposed in this pull request?

Copied from [SPARK-19555](https://issues.apache.org/jira/browse/SPARK-19555)

Spark's `StringUtils.escapeLikeRegex()` method is written inefficiently, 
performing tons of object allocations due to the use `zip()`, `flatMap()` , and 
`mkString`. Instead, I think method should be rewritten in an imperative style 
using a Java string builder.

This method can become a performance bottleneck in cases where regex 
expressions are used with non-constant-foldable expressions (e.g. the regex 
expression comes from the data rather than being part of the query).

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
spark-19555-improve-escape-like-regex

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16893.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16893


commit bbcdeda98c14705b6de3efab70f2c58bc4539bb9
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2017-02-11T07:46:34Z

[SPARK-19555][SQL] Improve the performance of StringUtils.escapeLikeRegex




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16533: [SPARK-19160][PYTHON][SQL] Add udf decorator

2017-01-30 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/16533
  
What about also support using the type name without the parentheses, as a 
syntax sugar? e.g.:

```python
@udf(returnType =IntegerType) # instead of IntegerType()
def f(...):
   ...
```

It can be achieved by checking if the returnType is a subclass of DataType:

```python
if isinstance(returnType, type) and DataType in returnType.__mro__:
   returnType = returnType()
else ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable work with ...

2017-01-19 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/16593
  
I just found "create table using hive " (without "select ... from", i.e. 
the non-CTAS form) is handled by `CreateTableCommand` 
([source](https://github.com/apache/spark/blob/bcc510b021/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L411-L413)).
 But here the reordering of the columns is only handled for 
`CreateHiveTableAsSelectCommand`, which means the former would still suffer 
from the problem.

What about introducing a new analyzer rule to do the reordering of the 
columns when creating a partitioned hive table so both case could be covered?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable wor...

2017-01-18 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16593#discussion_r96774234
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -87,8 +101,8 @@ case class CreateHiveTableAsSelectCommand(
   }
 } else {
   try {
-sparkSession.sessionState.executePlan(InsertIntoTable(
-  metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
+
sparkSession.sessionState.executePlan(InsertIntoTable(metastoreRelation,
--- End diff --

IIUC the partition syntax doesn't contain type, e.g. ```create table t2 
using hive partitioned by (c1, c2) as select * from t1```. If one specify 
`partition by (c1 string, c2 int)` the parser would raise an error, because we 
have this specified in the parser syntax:

```g4
createTableHeader ...
(PARTITIONED BY partitionColumnNames=identifierList)?
... #createTable

identifierList
: '(' identifierSeq ')'
;

identifierSeq
: identifier (',' identifier)*
;

```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable wor...

2017-01-16 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16593#discussion_r96348515
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -1343,17 +1343,41 @@ class HiveDDLSuite
   sql("INSERT INTO t SELECT 2, 'b'")
   checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
 
-  val e = intercept[AnalysisException] {
-Seq(1 -> "a").toDF("i", 
"j").write.format("hive").partitionBy("i").saveAsTable("t2")
-  }
-  assert(e.message.contains("A Create Table As Select (CTAS) statement 
is not allowed " +
-"to create a partitioned table using Hive"))
-
   val e2 = intercept[AnalysisException] {
 Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, 
"i").saveAsTable("t2")
   }
   assert(e2.message.contains("Creating bucketed Hive serde table is 
not supported yet"))
 
+  try {
+spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
--- End diff --

I think we can use `withSQLConf` instead of `try .. finally ..`.

```scala
withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
...
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable wor...

2017-01-16 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16593#discussion_r96348791
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -45,6 +46,25 @@ case class CreateHiveTableAsSelectCommand(
   override def innerChildren: Seq[LogicalPlan] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+
+// relation should move partition columns to the last
+val (partOutputs, nonPartOutputs) = query.output.partition {
+  a =>
+tableDesc.partitionColumnNames.contains(a.name)
+}
+
+// the CTAS's SELECT partition-outputs order should be consistent with
+// tableDesc.partitionColumnNames
+val reorderPartOutputs = tableDesc.partitionColumnNames.map {
--- End diff --

nit: `reorderPartOutputs` -> `reorderedPartOutputs`. The former sounds like 
a verb while the later sounds like a noun.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable wor...

2017-01-16 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16593#discussion_r96349044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -183,9 +183,15 @@ case class CatalogTable(
 
   import CatalogTable._
 
-  /** schema of this table's partition columns */
-  def partitionSchema: StructType = StructType(schema.filter {
-c => partitionColumnNames.contains(c.name)
+  /**
+   * schema of this table's partition columns
+   * keep the schema order with partitionColumnNames
--- End diff --

"keep the schema order with partitionColumnNames because we always 
concatenate the partition columns to the schema when reading the table 
information from hive  metastore."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable wor...

2017-01-16 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16593#discussion_r96349144
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 ---
@@ -183,9 +183,15 @@ case class CatalogTable(
 
   import CatalogTable._
 
-  /** schema of this table's partition columns */
-  def partitionSchema: StructType = StructType(schema.filter {
-c => partitionColumnNames.contains(c.name)
+  /**
+   * schema of this table's partition columns
+   * keep the schema order with partitionColumnNames
+   */
+  def partitionSchema: StructType = StructType(partitionColumnNames.map {
+p => schema.find(_.name == p).getOrElse(
+  throw new AnalysisException(s"Partition column [$p] " +
+s"did not exist in schema ${schema.toString}")
--- End diff --

"did not exist" -> "does not exist"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable wor...

2017-01-16 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16593#discussion_r96348696
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 ---
@@ -88,7 +108,9 @@ case class CreateHiveTableAsSelectCommand(
 } else {
   try {
 sparkSession.sessionState.executePlan(InsertIntoTable(
-  metastoreRelation, Map(), query, overwrite = true, ifNotExists = 
false)).toRdd
+metastoreRelation, Map(), reorderOutputQuery, overwrite = true
+  , ifNotExists = false))
--- End diff --

nit: The comma should be in the line above (after `overwrite = true`). 
Actually I think we can put all the args to `InsertIntoTable` in the same line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16593: [SPARK-19153][SQL]DataFrameWriter.saveAsTable wor...

2017-01-16 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16593#discussion_r96348933
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala 
---
@@ -1343,17 +1343,41 @@ class HiveDDLSuite
   sql("INSERT INTO t SELECT 2, 'b'")
   checkAnswer(spark.table("t"), Row(9, "x") :: Row(2, "b") :: Nil)
 
-  val e = intercept[AnalysisException] {
-Seq(1 -> "a").toDF("i", 
"j").write.format("hive").partitionBy("i").saveAsTable("t2")
-  }
-  assert(e.message.contains("A Create Table As Select (CTAS) statement 
is not allowed " +
-"to create a partitioned table using Hive"))
-
   val e2 = intercept[AnalysisException] {
 Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, 
"i").saveAsTable("t2")
   }
   assert(e2.message.contains("Creating bucketed Hive serde table is 
not supported yet"))
 
+  try {
+spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
+Seq(10 -> "y").toDF("i", 
"j").write.format("hive").partitionBy("i").saveAsTable("t3")
+checkAnswer(spark.table("t3"), Row("y", 10) :: Nil)
+table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3"))
+var partitionSchema = table.partitionSchema
+assert(partitionSchema.size == 1 && partitionSchema.fields(0).name 
== "i" &&
+  partitionSchema.fields(0).dataType == IntegerType)
+
+Seq(11 -> "z").toDF("i", 
"j").write.mode("overwrite").format("hive")
+  .partitionBy("j").saveAsTable("t3")
+checkAnswer(spark.table("t3"), Row(11, "z") :: Nil)
+table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3"))
+partitionSchema = table.partitionSchema
+assert(partitionSchema.size == 1 && partitionSchema.fields(0).name 
== "j" &&
+  partitionSchema.fields(0).dataType == StringType)
+
+Seq((1, 2, 3)).toDF("i", "j", 
"k").write.mode("overwrite").format("hive")
+  .partitionBy("k", "j").saveAsTable("t3")
+table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3"))
+checkAnswer(spark.table("t3"), Row(1, 3, 2) :: Nil)
+
+Seq((1, 2, 3)).toDF("i", "j", 
"k").write.mode("overwrite").format("hive")
+  .partitionBy("j", "k").saveAsTable("t3")
+table = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("t3"))
+checkAnswer(spark.table("t3"), Row(1, 2, 3) :: Nil)
+  } finally {
+spark.sql("set hive.exec.dynamic.partition.mode=strict")
+  }
+
--- End diff --

I think this test case is a bit fat, maybe we can split it into two or 
three smaller ones? e.g.:

```scala
  test("create hive serde table with DataFrameWriter.saveAsTable - basic") 
...
  test("create hive serde table with DataFrameWriter.saveAsTable - 
overwrite and append") ...
  test("create hive serde table with DataFrameWriter.saveAsTable - 
partitioned") ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-11 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91856289
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +458,78 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(
+  taskRunner: TaskRunner,
+  val interruptThread: Boolean)
+extends Runnable {
+
+private[this] val taskId: Long = taskRunner.taskId
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+  try {
+while (!taskRunner.isFinished && (elapsedTimeMs < killTimeoutMs || 
killTimeoutMs <= 0)) {
+  taskRunner.kill(interruptThread = interruptThread)
--- End diff --

> In the case where we do interrupt, however, the introduction of this 
polling loop means that we'll interrupt the same task multiple times

My 2c: if the application code doesn't respond to the first interrupt 
immediately, chances are very low that it would respond to the following 
interrupts (it may got stuck in some dead loop, or some blocking JNI call), so 
sending multiple interrupt may not be necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-11 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91855204
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +458,78 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(
+  taskRunner: TaskRunner,
+  val interruptThread: Boolean)
+extends Runnable {
+
+private[this] val taskId: Long = taskRunner.taskId
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
--- End diff --

Maybe the name `killPollingInterval` would be slightly better? Also I think 
the poll interval should not be larger than `spark.task.killTimeout`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-11 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r91855053
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +458,78 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(
+  taskRunner: TaskRunner,
+  val interruptThread: Boolean)
+extends Runnable {
+
+private[this] val taskId: Long = taskRunner.taskId
+
+private[this] val killPollingFrequencyMs: Long =
+  conf.getTimeAsMs("spark.task.killPollingFrequency", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.threadDumpKilledTasks", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+  try {
+while (!taskRunner.isFinished && (elapsedTimeMs < killTimeoutMs || 
killTimeoutMs <= 0)) {
+  taskRunner.kill(interruptThread = interruptThread)
+  taskRunner.synchronized {
+taskRunner.wait(killPollingFrequencyMs)
+  }
+  if (!taskRunner.isFinished) {
+logWarning(s"Killed task $taskId is still running after 
$elapsedTimeMs ms")
+if (takeThreadDump) {
+  try {
+
Utils.getThreadDumpForThread(taskRunner.getThreadId).foreach { thread =>
+  if (thread.threadName == taskRunner.threadName) {
+logWarning(s"Thread dump from task 
$taskId:\n${thread.stackTrace}")
+  }
+}
+  } catch {
+case NonFatal(e) =>
+  logWarning("Exception thrown while obtaining thread 
dump: ", e)
+  }
+}
+  }
+}
+
+if (!taskRunner.isFinished && killTimeoutMs > 0 && elapsedTimeMs > 
killTimeoutMs) {
--- End diff --

I see we also have `(elapsedTimeMs < killTimeoutMs || killTimeoutMs <= 0)` 
above, what about extracting `killTimeoutMs > 0 && elapsedTimeMs > 
killTimeoutMs` to a method, e.g `def timeoutExceeded(): Boolean`? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16082: [SPARK-18652][PYTHON] Include the example data an...

2016-12-04 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16082#discussion_r90774898
  
--- Diff: python/setup.py ---
@@ -69,10 +69,13 @@
 
 EXAMPLES_PATH = os.path.join(SPARK_HOME, "examples/src/main/python")
 SCRIPTS_PATH = os.path.join(SPARK_HOME, "bin")
+DATA_PATH = os.path.join(SPARK_HOME, "data")
 SCRIPTS_TARGET = os.path.join(TEMP_PATH, "bin")
 JARS_TARGET = os.path.join(TEMP_PATH, "jars")
 EXAMPLES_TARGET = os.path.join(TEMP_PATH, "examples")
-
+DATA_TARGET = os.path.join(TEMP_PATH, "data")
+LICENSES_PATH = os.path.join(SPARK_HOME, "licenses")
--- End diff --

Done. Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16082: [SPARK-18652][PYTHON] Include the example data an...

2016-12-04 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16082#discussion_r90774685
  
--- Diff: python/MANIFEST.in ---
@@ -17,6 +17,8 @@
 global-exclude *.py[cod] __pycache__ .DS_Store
 recursive-include deps/jars *.jar
 graft deps/bin
+graft deps/data
--- End diff --

On a second thought I think it would be better to keep all the packages 
consistent (specifying the patterns to include in both MANIFEST.in and 
setup.py), so I changed the `graft` to `recursive-include` with proper patters 
as you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16082: [SPARK-18652][PYTHON] Include the example data an...

2016-12-04 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/16082#discussion_r90774620
  
--- Diff: python/setup.py ---
@@ -69,10 +69,15 @@
 
 EXAMPLES_PATH = os.path.join(SPARK_HOME, "examples/src/main/python")
 SCRIPTS_PATH = os.path.join(SPARK_HOME, "bin")
+DATA_PATH = os.path.join(SPARK_HOME, "data")
 SCRIPTS_TARGET = os.path.join(TEMP_PATH, "bin")
 JARS_TARGET = os.path.join(TEMP_PATH, "jars")
 EXAMPLES_TARGET = os.path.join(TEMP_PATH, "examples")
+DATA_TARGET = os.path.join(TEMP_PATH, "data")
+LICENSES_PATH = os.path.join(SPARK_HOME, "licenses")
+LICENSES_TARGET = os.path.join(TEMP_PATH, "licenses")
 
+data_files = glob.glob(os.path.join(LICENSES_PATH, "*"))
--- End diff --

Makes sense, I'll remove the data_files related to make them consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16082: [SPARK-18652] Include the example data and third-party l...

2016-12-02 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/16082
  
@holdenk @rxin Can we merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16082: [SPARK-18652] Include the data in pyspark package...

2016-11-30 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/16082

[SPARK-18652] Include the data in pyspark package.

## What changes were proposed in this pull request?

Since we already include the python examples in the pyspark package, we 
should include the example data with it as well.

## How was this patch tested?

Manually tested
```sh
$ ./build/mvn -DskipTests -Phive -Phive-thriftserver -Pyarn -Pmesos clean 
package
$ cd python
$ python setup.py sdist
$ pip install  dist/pyspark-2.1.0.dev0.tar.gz 
$ ls -1 /usr/local/lib/python2.7/dist-packages/pyspark/data/
graphx
mllib
streaming
$ du -sh /usr/local/lib/python2.7/dist-packages/pyspark/data/
600K/usr/local/lib/python2.7/dist-packages/pyspark/data/
```


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark include-data-in-pyspark-dist

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16082.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16082


commit 43019db973eac2e89f77bfdbd5d15fdea3a3050a
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2016-11-30T17:09:24Z

[SPARK-18652] Include the data in pyspark package.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16049: [SPARK-16282][SQL] Follow-up: remove "percentile"...

2016-11-28 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/16049

[SPARK-16282][SQL] Follow-up: remove "percentile" from temp function 
detection after implementing it natively

## What changes were proposed in this pull request?

In #15764 we added a mechanism to detect if a function is temporary or not. 
Hive functions are treated as non-temporary. Of the three hive functions, now 
"percentile" has been implemented natively, and "hash" has been removed. So we 
should update the list.

## How was this patch tested?

Unit tests.

Please review http://spark.apache.org/contributing.html before opening a 
pull request.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
update-temp-function-detect-hive-list

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16049.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16049


commit 1a8d949fc42487ac4b2f303fdcf683bc223b22ac
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2016-11-29T01:31:40Z

[SPARK-16282][SQL] Follow-up: remove "percentile" from temp function 
detection after implementing it natively




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14136: [SPARK-16282][SQL] Implement percentile SQL funct...

2016-11-27 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/14136#discussion_r89686604
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala
 ---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.expressions.aggregate
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
DataInputStream, DataOutputStream}
+import java.util
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import 
org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, 
TypeCheckSuccess}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.types._
+import org.apache.spark.util.collection.OpenHashMap
+
+/**
+ * The Percentile aggregate function returns the exact percentile(s) of 
numeric column `expr` at
+ * the given percentage(s) with value range in [0.0, 1.0].
+ *
+ * The operator is bound to the slower sort based aggregation path because 
the number of elements
+ * and their partial order cannot be determined in advance. Therefore we 
have to store all the
+ * elements in memory, and that too many elements can cause GC paused and 
eventually OutOfMemory
+ * Errors.
+ *
+ * @param child child expression that produce numeric column value with 
`child.eval(inputRow)`
+ * @param percentageExpression Expression that represents a single 
percentage value or an array of
+ * percentage values. Each percentage value 
must be in the range
+ * [0.0, 1.0].
+ */
+@ExpressionDescription(
+  usage =
+"""
+  _FUNC_(col, percentage) - Returns the exact percentile value of 
numeric column `col` at the
+  given percentage. The value of percentage must be between 0.0 and 
1.0.
+
+  _FUNC_(col, array(percentage1 [, percentage2]...)) - Returns the 
exact percentile value array
+  of numeric column `col` at the given percentage(s). Each value of 
the percentage array must
+  be between 0.0 and 1.0.
+""")
+case class Percentile(
+  child: Expression,
+  percentageExpression: Expression,
+  mutableAggBufferOffset: Int = 0,
+  inputAggBufferOffset: Int = 0) extends 
TypedImperativeAggregate[OpenHashMap[Number, Long]] {
+
+  def this(child: Expression, percentageExpression: Expression) = {
+this(child, percentageExpression, 0, 0)
+  }
+
+  override def prettyName: String = "percentile"
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: 
Int): Percentile =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
Percentile =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  // Mark as lazy so that percentageExpression is not evaluated during 
tree transformation.
+  @transient
+  private lazy val returnPercentileArray = 
percentageExpression.dataType.isInstanceOf[ArrayType]
+
+  @transient
+  private lazy val percentages = percentageExpression.eval() match {
+case p: Double => Seq(p)
+case a: ArrayData => a.toDoubleArray().toSeq
+  }
+
+  override def children: Seq[Expression] = child :: percentageExpression 
:: Nil
+
+  // Returns null for empty inputs
+  override def nullable: Boolean = true
+
+  override lazy val dataType: DataType = percentageExpression.dataType 
match {
+case _: ArrayType => ArrayType(DoubleType, false)
+case _ => DoubleType
+  }
+
+  override def inputTypes: Seq[AbstractDataType] = 
percentageExpression.dataType match {
+case _: ArrayType => Seq(NumericType, ArrayType(DoubleType, false))
   

[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-24 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r89476391
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -17,19 +17,22 @@
 
 package org.apache.spark.storage
 
-import java.io.InputStream
+import java.io.{InputStream, IOException}
+import java.nio.ByteBuffer
 import java.util.concurrent.LinkedBlockingQueue
 import javax.annotation.concurrent.GuardedBy
 
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashSet, Queue}
 import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkException, TaskContext}
 import org.apache.spark.internal.Logging
-import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer}
 import org.apache.spark.network.shuffle.{BlockFetchingListener, 
ShuffleClient}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.Utils
+import org.apache.spark.util.io.{ChunkedByteBufferInputStream, 
ChunkedByteBufferOutputStream}
--- End diff --

seems `ChunkedByteBufferInputStream` is not used here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-24 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r89487599
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -305,40 +312,82 @@ final class ShuffleBlockFetcherIterator(
*/
   override def next(): (BlockId, InputStream) = {
 numBlocksProcessed += 1
-val startFetchWait = System.currentTimeMillis()
-currentResult = results.take()
-val result = currentResult
-val stopFetchWait = System.currentTimeMillis()
-shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
-
-result match {
-  case SuccessFetchResult(_, address, size, buf, isNetworkReqDone) =>
-if (address != blockManager.blockManagerId) {
-  shuffleMetrics.incRemoteBytesRead(buf.size)
-  shuffleMetrics.incRemoteBlocksFetched(1)
-}
-bytesInFlight -= size
-if (isNetworkReqDone) {
-  reqsInFlight -= 1
-  logDebug("Number of requests in flight " + reqsInFlight)
-}
-  case _ =>
-}
-// Send fetch requests up to maxBytesInFlight
-fetchUpToMaxBytes()
 
-result match {
-  case FailureFetchResult(blockId, address, e) =>
-throwFetchFailedException(blockId, address, e)
+var result: FetchResult = null
+var input: InputStream = null
+// Take the next fetched result and try to decompress it to detect 
data corruption,
+// then fetch it one more time if it's corrupt, throw 
FailureFetchResult if the second fetch
+// is also corrupt, so the previous stage could be retried.
+// For local shuffle block, throw FailureFetchResult for the first 
IOException.
+while (result == null) {
+  val startFetchWait = System.currentTimeMillis()
+  result = results.take()
+  val stopFetchWait = System.currentTimeMillis()
+  shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
 
-  case SuccessFetchResult(blockId, address, _, buf, _) =>
-try {
-  (result.blockId, new 
BufferReleasingInputStream(buf.createInputStream(), this))
-} catch {
-  case NonFatal(t) =>
-throwFetchFailedException(blockId, address, t)
-}
+  result match {
+case r @ SuccessFetchResult(blockId, address, size, buf, 
isNetworkReqDone) =>
+  if (address != blockManager.blockManagerId) {
+shuffleMetrics.incRemoteBytesRead(buf.size)
+shuffleMetrics.incRemoteBlocksFetched(1)
+  }
+  bytesInFlight -= size
+  if (isNetworkReqDone) {
+reqsInFlight -= 1
+logDebug("Number of requests in flight " + reqsInFlight)
+  }
+
+  val in = try {
+buf.createInputStream()
+  } catch {
+// The exception could only be throwed by local shuffle block
+case e: IOException =>
+  assert(buf.isInstanceOf[FileSegmentManagedBuffer])
+  logError("Failed to create input stream from local block", e)
+  buf.release()
+  throwFetchFailedException(blockId, address, e)
+  }
+
+  input = streamWrapper(blockId, in)
+  // Only copy the stream if it's wrapped by compression or 
encryption, also the size of
+  // block is small (the decompressed block is smaller than 
maxBytesInFlight)
+  if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 
3) {
+val out = new ChunkedByteBufferOutputStream(64 * 1024, 
ByteBuffer.allocate)
+try {
+  // Decompress the whole block at once to detect any 
corruption, which could increase
+  // the memory usage tne potential increase the chance of OOM.
+  // TODO: manage the memory used here, and spill it into disk 
in case of OOM.
+  Utils.copyStream(input, out)
+  out.close()
+  input = out.toChunkedByteBuffer.toInputStream(true)
+} catch {
+  case e: IOException =>
+buf.release()
+if (buf.isInstanceOf[FileSegmentManagedBuffer]
+  || corruptedBlocks.contains(blockId)) {
+  throwFetchFailedException(blockId, address, e)
+} else {
+  logWarning(s"got an corrupted block $blockId from 
$address, fetch again")
--- End diff --

I think the IOException would be set as the cause of the 
`FetchFailedException`.


---
If your project is set up for it

[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-24 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r89486838
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala 
---
@@ -108,6 +113,9 @@ final class ShuffleBlockFetcherIterator(
   /** Current number of requests in flight */
   private[this] var reqsInFlight = 0
 
+  /** The blocks that can't be decompressed successfully */
--- End diff --

What about add more explanation, for example:

```java
 /** The blocks that can't be decompressed successfully. 
  ** It is used to guarantee that we retry at most once for those corrupted 
blocks. 
  **/
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15923: [SPARK-4105] retry the fetch or stage if shuffle ...

2016-11-24 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15923#discussion_r89475841
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
@@ -42,24 +42,21 @@ private[spark] class BlockStoreShuffleReader[K, C](
 
   /** Read the combined key-values for this reduce task */
   override def read(): Iterator[Product2[K, C]] = {
-val blockFetcherItr = new ShuffleBlockFetcherIterator(
+val wrappedStreams = new ShuffleBlockFetcherIterator(
   context,
   blockManager.shuffleClient,
   blockManager,
   mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, 
startPartition, endPartition),
+  serializerManager.wrapStream,
   // Note: we use getSizeAsMb when no suffix is provided for backwards 
compatibility
   SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", 
"48m") * 1024 * 1024,
-  SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", 
Int.MaxValue))
-
-// Wrap the streams for compression and encryption based on 
configuration
-val wrappedStreams = blockFetcherItr.map { case (blockId, inputStream) 
=>
-  serializerManager.wrapStream(blockId, inputStream)
-}
+  SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", 
Int.MaxValue),
+  SparkEnv.get.conf.getBoolean("spark.shuffle.tryDecompress", true))
--- End diff --

nit: maybe `detectCorrupt` is slightly better than `tryDecompress` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15684: [SPARK-18171][MESOS] Show correct framework address in m...

2016-11-06 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15684
  
@zsxwing @mgummelt could you help review this PR? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-11-01 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r85924864
  
--- Diff: docs/configuration.md ---
@@ -1350,6 +1350,20 @@ Apart from these, the following properties are also 
available, and may be useful
 Should be greater than or equal to 1. Number of allowed retries = this 
value - 1.
   
 
+
+  spark.scheduler.taskAssigner
+  roundrobin
+  
+The strategy of how to allocate tasks among workers with free cores. 
Three task
+assigners (roundrobin, packed, and balanced) are supported currently. 
By default, roundrobin
--- End diff --

Nit: I suggest double quote the keywords "roundrobin", "packed", and 
"balanced" in this paragraph. E.g. `the "balanced" task assigner` sounds better 
to me than `the balanced task assigner`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15541: [SPARK-17637][Scheduler]Packed scheduling for Spa...

2016-11-01 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15541#discussion_r85956857
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -305,12 +307,8 @@ private[spark] class TaskSchedulerImpl(
 hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
   }
 }
+taskAssigner.construct(offers)
--- End diff --

The comments of the `resourceOffers` method shoud be updated. It still says 
`We fill each node with tasks in a round-robin manner so that tasks are 
balanced across the cluster.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15684: [SPARK-18171][MESOS] Show correct framework address in m...

2016-10-30 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15684
  
/cc @zsxwing who worked on 
[SPARK-4563](https://issues.apache.org/jira/browse/SPARK-4563) and @mgummelt .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15684: [SPARK-18171][MESOS] Show correct framework addre...

2016-10-30 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/15684

[SPARK-18171][MESOS] Show correct framework address in mesos master web ui 
when the advertised address is used

## What changes were proposed in this pull request?

In [SPARK-4563](https://issues.apache.org/jira/browse/SPARK-4563) we added 
the support for the driver to advertise a different hostname/ip 
(`spark.driver.host` to the executors other than the hostname/ip the driver 
actually binds to (`spark.driver.bindAddress`). But in the mesos webui's 
frameworks page, it still shows the driver's binds hostname/ip (though the web 
ui link is correct). We should fix it to make them consistent. 

Before:


![mesos-spark](https://cloud.githubusercontent.com/assets/717363/19835148/4dffc6d4-9eb8-11e6-8999-f80f10e4c3f7.png)

After:


![mesos-spark2](https://cloud.githubusercontent.com/assets/717363/19835149/54596ae4-9eb8-11e6-896c-230426acd1a1.png)


This PR doesn't affect the behavior on the spark side, only makes the 
display on the mesos master side more consistent.

## How was this patch tested?

Manual test. 

* Build the package and build a docker image (spark:2.1-test)

```sh
./dev/make-distribution.sh -Phadoop-2.6 -Phive -Phive-thriftserver -Pyarn 
-Pmesos
```

*  Then run the spark driver inside a docker container.

```sh
docker run --rm -it \
  --name=spark \
  -p 3-30010:3-30010 \
  -e LIBPROCESS_ADVERTISE_IP=127.0.0.1 \
  -e LIBPROCESS_PORT=3 \
  -e MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos-1.0.0.so \
  -e MESOS_NATIVE_JAVA_LIBRARY=/usr/local/lib/libmesos-1.0.0.so \
  -e SPARK_HOME=/opt/dist \
  spark:2.1-test
```
* Inside the container, launch the spark driver, making use of the 
advertised address:

```sh
/opt/dist/bin/spark-shell \
  --master mesos://zk://172.17.42.1:2181/mesos \
  --conf spark.driver.host=172.17.42.1 \
  --conf spark.driver.bindAddress=172.17.0.1 \
  --conf spark.driver.port=30001 \
  --conf spark.driver.blockManager.port=30002 \
  --conf spark.ui.port=30003 \
  --conf spark.mesos.coarse=true \
  --conf spark.cores.max=2 \
  --conf spark.executor.cores=1 \
  --conf spark.executor.memory=1g \
  --conf spark.mesos.executor.docker.image=spark:2.1-test
```

* Run several spark jobs that to ensure everything is running fine.

```scala
val rdd = sc.textFile("file:///opt/dist/README.md")
rdd.cache().count
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
spark-18171-show-correct-host-name-in-mesos-master-web-ui

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15684.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15684


commit 19a4261b2aa3a7ef3c53c4a82d5adc2c3c65cba1
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2016-10-30T07:44:51Z

[SPARK-18171][MESOS] Show correct framework address in mesos master web ui 
when the advertised address is used




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15487: [SPARK-17940][SQL] Fixed a typo in LAST function ...

2016-10-27 Thread lins05
Github user lins05 closed the pull request at:

https://github.com/apache/spark/pull/15487


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15487: [SPARK-17940][SQL] Fixed a typo in LAST function and imp...

2016-10-27 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15487
  
@HyukjinKwon OK, please fix all these in your PR. I'll close this small one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-25 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
@srowen Could we get this merged since the tests are now green? Not sure 
why it failed previously, it just turned green without me doing anything.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-19 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
@weiqingy Emm, then we would also add the logic of checking 
"hadoop.caller.context.enabled" in the test code, which makes the test code 
simply duplicates the code path of `CallerContext.callerContextSupported`, and 
IMHO this doesn't make much sense...

On the other hand I do agree we should test the code path, but it seems not 
easy to do it.

@srowen @jerryshao What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-19 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
@weiqingy I agree that's a problem. But i don't see how to unit test the 
`callerContextSupported` method without repeating the same logic in the test 
code. Do you have any suggestion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15487: [SPARK-17940][SQL] Fixed a typo in LAST function and imp...

2016-10-18 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15487
  
@HyukjinKwon I've updated the usage string. Now it looks like this:
```
spark-sql> describe function first;
Function: first
Class: org.apache.spark.sql.catalyst.expressions.aggregate.First
Usage: 
  first(expr) - Returns the first value of `child` for a group of rows.

  first(expr, isIgnoreNull=false) - Returns the first value of `child` 
for a group of rows.
  If isIgnoreNull is true, returns only non-null values.

  Note that in most cases the result is nondeterministic.

spark-sql> describe function last;
Function: last
Class: org.apache.spark.sql.catalyst.expressions.aggregate.Last
Usage: 
  last(expr) - Returns the last value of `child` for a group of rows.

  last(expr, isIgnoreNull) - Returns the last value of `child` for a 
group of rows.
  If isIgnoreNull is true, returns only non-null values.

  Note that in most cases the result is nondeterministic.
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-18 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
Thanks @jerryshao @srowen . I've updated the code like what you suggested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15487: [SPARK-17940][SQL] Fixed a typo in LAST function and imp...

2016-10-17 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15487
  
@HyukjinKwon thanks, I'll update the PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-16 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
@srowen done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-16 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
@weiqingy @srowen I see. So do you suggest to avoid using 
`Utils.classForName` to get this one merged, or rather wait for SPARK-17714?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15487: [SPARK-17940][SQL] Fixed a typo in LAST function ...

2016-10-16 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15487#discussion_r83551685
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala
 ---
@@ -29,15 +29,18 @@ import org.apache.spark.sql.types._
  * a single partition, and we use a single reducer to do the aggregation.).
  */
 @ExpressionDescription(
-  usage = "_FUNC_(expr,isIgnoreNull) - Returns the last value of `child` 
for a group of rows.")
+  usage = """_FUNC_(expr,isIgnoreNull) - Returns the last value of `child` 
for a group of rows.
+_FUNC_(expr,isIgnoreNull=false) - Returns the last value of `child` 
for a group of rows.
--- End diff --

@hvanhovell @HyukjinKwon Thanks for the review. I'm simply following the 
usage string of other functions, e.g:

```
spark-sql> describe function first;
Function: first
Class: org.apache.spark.sql.catalyst.expressions.aggregate.First
Usage: first(expr) - Returns the first value of `child` for a group of rows.
first(expr,isIgnoreNull=false) - Returns the first value of `child` for 
a group of rows.
  If isIgnoreNull is true, returns only non-null values.
   
spark-sql> describe function approx_count_distinct;
Function: approx_count_distinct
Class: 
org.apache.spark.sql.catalyst.expressions.aggregate.HyperLogLogPlusPlus
Usage: approx_count_distinct(expr) - Returns the estimated cardinality by 
HyperLogLog++.
approx_count_distinct(expr, relativeSD=0.05) - Returns the estimated 
cardinality by HyperLogLog++
  with relativeSD, the maximum estimation error allowed.
```

So it seems the current convention is that: the first line is a short 
one-line description, followed by a detail description. Do we have any explicit 
"usage string style" to follow?

@hvanhovell I'll add the note about nondeterministic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15487: [SPARK-17940][SQL] Fixed a typo in LAST function ...

2016-10-14 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/15487

[SPARK-17940][SQL] Fixed a typo in LAST function and improved its usage 
string

## What changes were proposed in this pull request?

* Fixed a a typo in the LAST function error message
* Also improved its usage string to match the FIRST function

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark spark-17940-typo-in-last-func

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15487


commit 33988d3179edf0f179da11f1b8dfc4d28d9c5d08
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2016-10-14T15:46:07Z

[SPARK-17940][SQL] Fixed a typo in LAST function and improved its usage 
string.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-13 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r83206341
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2479,20 +2483,35 @@ private[spark] class CallerContext(
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 
2.8.
*/
   def setCurrentContext(): Boolean = {
-var succeed = false
-try {
-  // scalastyle:off classforname
-  val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
-  val Builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
-  // scalastyle:on classforname
-  val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
-  val hdfsContext = Builder.getMethod("build").invoke(builderInst)
-  callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
-  succeed = true
-} catch {
-  case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+if (!CallerContext.callerContextSupported) {
+  false
+} else {
+  if 
(!SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false)) {
+logInfo("Hadoop caller context is not enabled")
+CallerContext.callerContextSupported = false
+false
+  } else {
+try {
+  // scalastyle:off classforname
--- End diff --

Good catch, done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-13 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
> Another thing, do you verify it locally? Since there's no unit test to 
cover it.
@jerryshao Yeah, I did test it locally to ensure the error is only logged 
once.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-13 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r83179622
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2474,25 +2478,42 @@ private[spark] class CallerContext(
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
  jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
 
+  lazy val conf = new Configuration
+
   /**
* Set up the caller context [[context]] by invoking Hadoop 
CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 
2.8.
*/
   def setCurrentContext(): Boolean = {
-var succeed = false
-try {
-  // scalastyle:off classforname
-  val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
-  val Builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
-  // scalastyle:on classforname
-  val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
-  val hdfsContext = Builder.getMethod("build").invoke(builderInst)
-  callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
-  succeed = true
-} catch {
-  case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+if (!CallerContext.callerContextSupported) {
+  false
+} else {
+  if (!conf.getBoolean("hadoop.caller.context.enabled", false)) {
+logInfo("Hadoop caller context is not enabled")
+CallerContext.callerContextSupported = false
+false
+  } else {
+try {
+// scalastyle:off classforname
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-13 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r83179598
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2474,25 +2478,42 @@ private[spark] class CallerContext(
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
  jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
 
+  lazy val conf = new Configuration
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-13 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r83179592
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2474,25 +2478,42 @@ private[spark] class CallerContext(
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
  jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
 
+  lazy val conf = new Configuration
+
   /**
* Set up the caller context [[context]] by invoking Hadoop 
CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 
2.8.
*/
   def setCurrentContext(): Boolean = {
-var succeed = false
-try {
-  // scalastyle:off classforname
-  val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
-  val Builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
-  // scalastyle:on classforname
-  val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
-  val hdfsContext = Builder.getMethod("build").invoke(builderInst)
-  callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
-  succeed = true
-} catch {
-  case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+if (!CallerContext.callerContextSupported) {
+  false
+} else {
+  if (!conf.getBoolean("hadoop.caller.context.enabled", false)) {
+logInfo("Hadoop caller context is not enabled")
+CallerContext.callerContextSupported = false
+false
+  } else {
+try {
+// scalastyle:off classforname
+val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-13 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r83178194
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2432,6 +2432,10 @@ private[spark] object Utils extends Logging {
   }
 }
 
+private[util] object CallerContext {
+  var callerContextSupported: Boolean = true
--- End diff --

It's used below, and you have commented on it :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15319: [SPARK-17733][SQL] InferFiltersFromConstraints ru...

2016-10-08 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15319#discussion_r82503911
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala 
---
@@ -74,14 +74,26 @@ abstract class QueryPlan[PlanType <: 
QueryPlan[PlanType]] extends TreeNode[PlanT
* additional constraint of the form `b = 5`
*/
   private def inferAdditionalConstraints(constraints: Set[Expression]): 
Set[Expression] = {
+// Collect alias from expressions to avoid producing non-converging 
set of constraints
+// for recursive functions.
+//
+// Don't apply transform on constraints if the attribute used to 
replace is an alias,
+// because then both `QueryPlan.inferAdditionalConstraints` and
+// `UnaryNode.getAliasedConstraints` applies and may produce a 
non-converging set of
+// constraints.
+// For more details, infer 
https://issues.apache.org/jira/browse/SPARK-17733
+val aliasMap = AttributeMap((expressions ++ 
children.flatMap(_.expressions)).collect {
--- End diff --

Why not using `AttributeSet`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15319: [SPARK-17733][SQL] InferFiltersFromConstraints ru...

2016-10-08 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15319#discussion_r82503891
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala 
---
@@ -74,14 +74,26 @@ abstract class QueryPlan[PlanType <: 
QueryPlan[PlanType]] extends TreeNode[PlanT
* additional constraint of the form `b = 5`
*/
   private def inferAdditionalConstraints(constraints: Set[Expression]): 
Set[Expression] = {
+// Collect alias from expressions to avoid producing non-converging 
set of constraints
+// for recursive functions.
+//
+// Don't apply transform on constraints if the attribute used to 
replace is an alias,
+// because then both `QueryPlan.inferAdditionalConstraints` and
+// `UnaryNode.getAliasedConstraints` applies and may produce a 
non-converging set of
+// constraints.
+// For more details, infer 
https://issues.apache.org/jira/browse/SPARK-17733
--- End diff --

typo "infer" -> "refer" (to)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-07 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r82492566
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2474,25 +2474,36 @@ private[spark] class CallerContext(
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
  jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
 
+   private var callerContextSupported: Boolean = true
+
   /**
* Set up the caller context [[context]] by invoking Hadoop 
CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 
2.8.
*/
   def setCurrentContext(): Boolean = {
-var succeed = false
-try {
-  // scalastyle:off classforname
-  val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
-  val Builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
-  // scalastyle:on classforname
-  val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
-  val hdfsContext = Builder.getMethod("build").invoke(builderInst)
-  callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
-  succeed = true
-} catch {
-  case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+if (!callerContextSupported) {
+  false
+} else {
+  try {
+// scalastyle:off classforname
+val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
+val builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
+// scalastyle:on classforname
+val builderInst = 
builder.getConstructor(classOf[String]).newInstance(context)
+val hdfsContext = builder.getMethod("build").invoke(builderInst)
+callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
+true
+  } catch {
+case e: ClassNotFoundException =>
+  logInfo(
+s"Fail to set Spark caller context: requires Hadoop 2.8 or 
later: ${e.getMessage}")
+  callerContextSupported = false
+  false
+case NonFatal(e) =>
+  logWarning("Fail to set Spark caller context", e)
--- End diff --

Thanks for the tip about `hadoop.caller.context.enabled` and the 
suggestion, i'll do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-07 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r82492524
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2432,6 +2432,10 @@ private[spark] object Utils extends Logging {
   }
 }
 
+private[spark] object CallerContext {
+  var callerContextSupported: Boolean = true
--- End diff --

Makes sense, done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-06 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r82224216
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2474,25 +2474,36 @@ private[spark] class CallerContext(
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
  jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
 
+   private var callerContextSupported: Boolean = true
+
   /**
* Set up the caller context [[context]] by invoking Hadoop 
CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 
2.8.
*/
   def setCurrentContext(): Boolean = {
-var succeed = false
-try {
-  // scalastyle:off classforname
-  val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
-  val Builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
-  // scalastyle:on classforname
-  val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
-  val hdfsContext = Builder.getMethod("build").invoke(builderInst)
-  callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
-  succeed = true
-} catch {
-  case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+if (!callerContextSupported) {
+  false
+} else {
+  try {
+// scalastyle:off classforname
+val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
+val builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
+// scalastyle:on classforname
+val builderInst = 
builder.getConstructor(classOf[String]).newInstance(context)
+val hdfsContext = builder.getMethod("build").invoke(builderInst)
+callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
+true
+  } catch {
+case e: ClassNotFoundException =>
+  logInfo(
+s"Fail to set Spark caller context: requires Hadoop 2.8 or 
later: ${e.getMessage}")
+  callerContextSupported = false
+  false
+case NonFatal(e) =>
+  logWarning("Fail to set Spark caller context", e)
--- End diff --

Emm, I prefer not to suppress the non-ClassNotFound errors, because they 
are real errors, compared to the ClassNotFound error, which is rather a 
*conditional feature* based on the hadoop environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-06 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r82139291
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2474,25 +2474,36 @@ private[spark] class CallerContext(
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
  jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
 
+   var callerContextSupported : Option[Boolean] = _
+
   /**
* Set up the caller context [[context]] by invoking Hadoop 
CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 
2.8.
*/
   def setCurrentContext(): Boolean = {
-var succeed = false
-try {
-  // scalastyle:off classforname
-  val callerContext = 
Class.forName("org.apache.hadoop.ipc.CallerContext")
-  val Builder = 
Class.forName("org.apache.hadoop.ipc.CallerContext$Builder")
-  // scalastyle:on classforname
-  val builderInst = 
Builder.getConstructor(classOf[String]).newInstance(context)
-  val hdfsContext = Builder.getMethod("build").invoke(builderInst)
-  callerContext.getMethod("setCurrent", callerContext).invoke(null, 
hdfsContext)
-  succeed = true
-} catch {
-  case NonFatal(e) => logInfo("Fail to set Spark caller context", e)
+callerContextSupported match {
+  case Some(false) =>
+false
+  case _ =>
+var succeed = false
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-06 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15377#discussion_r82139283
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -2474,25 +2474,36 @@ private[spark] class CallerContext(
val context = "SPARK_" + from + appIdStr + appAttemptIdStr +
  jobIdStr + stageIdStr + stageAttemptIdStr + taskIdStr + 
taskAttemptNumberStr
 
+   var callerContextSupported : Option[Boolean] = _
--- End diff --

Makes sense. I have simplied the logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15377: [SPARK-17802] Improved caller context logging.

2016-10-06 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15377
  
cc @weiqingy (who worked on 
[SPARK-16757](https://issues.apache.org/jira/browse/SPARK-16757).)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15377: [SPARK-17802] Improved caller context logging.

2016-10-06 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/15377

[SPARK-17802] Improved caller context logging.

## What changes were proposed in this pull request?

[SPARK-16757](https://issues.apache.org/jira/browse/SPARK-16757) sets the 
hadoop `CallerContext` when calling hadoop/hdfs apis to make spark applications 
more diagnosable in hadoop/hdfs logs. However, the 
`org.apache.hadoop.ipc.CallerContext` class is only added since [hadoop 
2.8](https://issues.apache.org/jira/browse/HDFS-9184). So each time 
`utils.CallerContext.setCurrentContext()` is called (e.g [when a task is 
created](https://github.com/apache/spark/blob/b678e46/core/src/main/scala/org/apache/spark/scheduler/Task.scala#L95-L96)),
 a "java.lang.ClassNotFoundException: org.apache.hadoop.ipc.CallerContext"
error is logged, which pollutes the spark logs when there are lots of tasks.

This patch improves this behaviour by only logging the 
`ClassNotFoundException` once.

## How was this patch tested?

Existing tests.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
spark-17802-improve-callercontext-logging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15377.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15377


commit b8621ab361966c6a63015ae08b0f63985db25176
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2016-10-06T03:18:20Z

[SPARK-17802] Improved caller context logging.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81208953
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala
 ---
@@ -17,18 +17,21 @@
 
 package org.apache.spark.sql.execution.python
 
+import java.io.File
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import net.razorvine.pickle.{Pickler, Unpickler}
 
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkEnv, TaskContext}
 import org.apache.spark.api.python.{ChainedPythonFunctions, PythonRunner}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.util.{CompletionIterator, Utils}
--- End diff --

Seems the import `CompletionIterator` is not used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15089: [SPARK-15621] [SQL] Support spilling for Python U...

2016-09-29 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/15089#discussion_r81212877
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/RowQueue.scala ---
@@ -0,0 +1,276 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.python
+
+import java.io._
+
+import com.google.common.io.Closeables
+
+import org.apache.spark.SparkException
+import org.apache.spark.memory.{MemoryConsumer, TaskMemoryManager}
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.unsafe.memory.MemoryBlock
+
+/**
+ * A RowQueue is an FIFO queue for UnsafeRow.
+ *
+ * This RowQueue is ONLY designed and used for Python UDF, which has only 
one writer and only one
+ * reader, the reader ALWAYS ran behind the writer.
+ */
+private[python] trait RowQueue {
+
+  /**
+   * Add a row to the end of it, returns true iff the row has added into 
it.
+   */
+  def add(row: UnsafeRow): Boolean
+
+  /**
+   * Retrieve and remove the first row, returns null if it's empty.
+   *
+   * It can only be called after add is called.
+   */
+  def remove(): UnsafeRow
+
+  /**
+   * Cleanup all the resources.
+   */
+  def close(): Unit
+}
+
+/**
+ * A RowQueue that is based on in-memory page. UnsafeRows are appended 
into it until it's full.
+ * Another thread could read from it at the same time (behind the writer).
+ *
+ * The format of UnsafeRow in page:
+ * [4 bytes to hold length of record (N)] [N bytes to hold record] [...]
+ */
+private[python] abstract class InMemoryRowQueue(val page: MemoryBlock, 
numFields: Int)
+  extends RowQueue {
+  private val base: AnyRef = page.getBaseObject
+  private val endOfPage: Long = page.getBaseOffset + page.size
+  // the first location where a new row would be written
+  private var writeOffset = page.getBaseOffset
+  // points to the start of the next row to read
+  private var readOffset = page.getBaseOffset
+  private val resultRow = new UnsafeRow(numFields)
+
+  def add(row: UnsafeRow): Boolean = {
+val size = row.getSizeInBytes
+if (writeOffset + 4 + size > endOfPage) {
+  // if there is not enough space in this page to hold the new record
+  if (writeOffset + 4 <= endOfPage) {
+// if there's extra space at the end of the page, store a special 
"end-of-page" length (-1)
+Platform.putInt(base, writeOffset, -1)
+  }
+  false
+} else {
+  Platform.putInt(base, writeOffset, size)
+  Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, 
writeOffset + 4, size)
+  writeOffset += 4 + size
+  true
+}
+  }
+
+  def remove(): UnsafeRow = {
+if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 
0) {
+  null
+} else {
+  val size = Platform.getInt(base, readOffset)
+  resultRow.pointTo(base, readOffset + 4, size)
+  readOffset += 4 + size
+  resultRow
+}
+  }
+}
+
+/**
+ * A RowQueue that is backed by a file on disk. This queue will stop 
accepting new rows once any
+ * reader has begun reading from the queue.
+ */
+private[python] case class DiskRowQueue(file: File, fields: Int) extends 
RowQueue {
+  private var out = new DataOutputStream(
+new BufferedOutputStream(new FileOutputStream(file.toString)))
+  private var unreadBytes = 0L
+
+  private var in: DataInputStream = _
+  private val resultRow = new UnsafeRow(fields)
+
+  def add(row: UnsafeRow): Boolean = synchronized {
+if (out == null) {
+  // Another thread is reading, stop writing this one
+  return false
+}
+out.writeInt(row.getSizeInBytes)
+out.write(row.getBytes)

[GitHub] spark issue #15254: [SPARK-17679] [PYSPARK] remove unnecessary Py4J ListConv...

2016-09-26 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15254
  
I guess we can also remove another workaround 
[here](https://github.com/apache/spark/blob/v2.0.0/python/pyspark/rdd.py#L2320-L2328)
 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15236: [SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml/mllib f...

2016-09-26 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15236
  
@srowen I saw there was a proposal to change `setAlpha` to `setFpr` in 
#15214, but it was not changed when the PR is merged. So I think this PR is up 
to upate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15236: [SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml/mllib f...

2016-09-25 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15236
  
Just found #15214 and #15212, I think this one need to wait until those are 
merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15236: [SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml ...

2016-09-25 Thread lins05
GitHub user lins05 opened a pull request:

https://github.com/apache/spark/pull/15236

[SPARK-17017][ML][MLLIB][ML][DOC] Updated the ml feature selection doc for 
ChiSqSelector

## What changes were proposed in this pull request?

A follow up for #14597 to update feature selection docs about ChiSqSelector.

## How was this patch tested?

Generated markdown docs. It can be previewed at:

* ml: http://sparkdocs.lins05.pw/spark-17017/ml-features.html#chisqselector
* mllib: 
http://sparkdocs.lins05.pw/spark-17017/mllib-feature-extraction.html#chisqselector

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lins05/spark 
spark-17017-update-docs-for-chisq-selector-fpr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15236.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15236


commit b272dee4bf6591d047dd3963ceaaeed458eb1662
Author: Shuai Lin <linshuai2...@gmail.com>
Date:   2016-09-25T16:13:53Z

[SPARK-17017][ML][MLLIB][DOC] Updated the ml feature selection doc for 
ChiSqSelector.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15043: [SPARK-17491] Close serialization stream to fix wrong an...

2016-09-10 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/15043
  
Did a simple test and it does fix the bug. One interesting thing is while 
records.count() returns a smaller number than the actual count, the spark UI 
still shows the correct records number, in my test case it's 2999808 v.s. 
30.

![screen shot 2016-09-10 at 5 57 38 
pm](https://cloud.githubusercontent.com/assets/717363/18409696/70a407e0-7780-11e6-9f22-7b55c24b0595.png)

![screen shot 2016-09-10 at 5 58 26 
pm](https://cloud.githubusercontent.com/assets/717363/18409697/75f5254e-7780-11e6-98fd-5cae496f7c22.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14628: [SPARK-17033][Follow-up][ML][MLLib] Improve kmean aggreg...

2016-08-13 Thread lins05
Github user lins05 commented on the issue:

https://github.com/apache/spark/pull/14628
  
A grep shows there is also call to `RDD.aggregate` in `LDAModel`, should we 
fix it here as well?

```sh
mllib/src/main/scala/org/apache/spark/mllib/clustering]$ git grep -E 
'\<aggregate\>' |grep -v // 
KMeans.scala:417:.aggregate(new Array[Double](runs))(
LDAModel.scala:756:graph.vertices.aggregate(0.0)(seqOp, _ + _)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11157: [SPARK-11714][Mesos] Make Spark on Mesos honor po...

2016-08-02 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r73214595
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -357,4 +375,191 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param conf the Spark Config
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = nonZeroPortValuesFromConfig(conf)
+val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param portsToAssign non-zero ports to assign
+   * @param offeredResources the ports offered
+   * @return resources left, port resources to be used.
+   */
+  def partitionPortResources(portsToAssign: List[Long], offeredResources: 
List[Resource])
+: (List[Resource], List[Resource]) = {
+if (portsToAssign.isEmpty) {
+  (offeredResources, List[Resource]())
+}
+// partition port offers
+val (resourcesWithoutPorts, portResources) = 
filterPortResources(offeredResources)
+val offeredPortRanges = 
getRangeResourceWithRoleInfo(portResources.asJava, "ports")
+// reserve non-zero ports
+val nonZeroResources = reservePorts(offeredPortRanges, portsToAssign)
+
+createResourcesFromAssignedPorts(nonZeroResources)
+  }
+
+  /**
+   * Returns known port name used by the executor process.
+   * @return the port name
+   */
+  def managedPortNames() : List[String] = List("spark.executor.port", 
"spark.blockManager.port")
+
+  /**
+   * The values of the non-zero ports to be used by the executor process.
+   * @param conf the spark config to use
+   * @return the ono-zero values of the ports
+   */
+  def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
+managedPortNames().map(conf.getLong(_, 0)).filter( _ != 0)
+  }
+
+  /**
+   * It gets a tuple for the non-zero port assigned resources.
+   * First member of the tuple represents resources left while the second
+   * resources used. A tuple is returned with final resources left (fist 
member)
+   * and the resources used (second member).
+   */
+  private def createResourcesFromAssignedPorts(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+(nonZero._1.flatMap{port => createMesosPortResource(port.range, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.range, 
Some(port.role))})
+  }
+
+  private case class PortRangeResourceInfo(role: String, range: 
List[(Long, Long)])
+
+  /**
+   * A resource can have multiple values in the offer since it can either 
be from
+   * a specific role or wildcard.
+   * Extract role info and port range for every port resource in the offer
+   */
+  private def getRangeResourceWithRoleInfo(resources: JList[Resource], 
name: String)
+: List[PortRangeResourceInfo] = {
+resources.asScala.filter(_.getName == name).
+  map{resource =>
+PortRangeResourceInfo(resource.getRole, 
resource.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList)
+  }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  offeredPortRanges: List[PortRangeResourceInfo],
+  requestedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+var tmpLeft = offeredPortRanges
+val tmpRanges = for {port <- requestedPorts}
+  yield {
+val ret = findPortAndSplitRange(port, tmpLeft)
+val rangeToRemove = ret._1
+val diffRanges = tmpLeft.filterNot{r => r == rangeToRemove}
+val newRangesLeft = diffRanges ++ List(ret._2).flatMap(p => p)
+tmpLeft = newRangesLeft
+ret
+  }
 

[GitHub] spark pull request #11157: [SPARK-11714][Mesos] Make Spark on Mesos honor po...

2016-08-02 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r73214231
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -357,4 +375,191 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param conf the Spark Config
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = nonZeroPortValuesFromConfig(conf)
+val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param portsToAssign non-zero ports to assign
+   * @param offeredResources the ports offered
+   * @return resources left, port resources to be used.
+   */
+  def partitionPortResources(portsToAssign: List[Long], offeredResources: 
List[Resource])
+: (List[Resource], List[Resource]) = {
+if (portsToAssign.isEmpty) {
+  (offeredResources, List[Resource]())
+}
+// partition port offers
--- End diff --

Then you need use `return` explicitly, or wrap the remaining code with an 
`else` block , otherwise it would continue to run the code after the `if` block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11157: [SPARK-11714][Mesos] Make Spark on Mesos honor po...

2016-08-02 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r73211731
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -357,4 +375,191 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param conf the Spark Config
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = nonZeroPortValuesFromConfig(conf)
+val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
--- End diff --

I'm a bit puzzled by the piece of `(..).sum >= portsToCheck.size`, can you 
explain it a bit? Also I guess it's better to write as `withinRange && 
ports.map(..) > ..`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #11157: [SPARK-11714][Mesos] Make Spark on Mesos honor po...

2016-08-02 Thread lins05
Github user lins05 commented on a diff in the pull request:

https://github.com/apache/spark/pull/11157#discussion_r73208305
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
 ---
@@ -357,4 +375,191 @@ private[mesos] trait MesosSchedulerUtils extends 
Logging {
 
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", 
"120s")
   }
 
+  /**
+   * Checks executor ports if they are within some range of the offered 
list of ports ranges,
+   *
+   * @param conf the Spark Config
+   * @param ports the list of ports to check
+   * @return true if ports are within range false otherwise
+   */
+  protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): 
Boolean = {
+
+def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
+  ps.exists(r => r._1 <= port & r._2 >= port)
+}
+
+val portsToCheck = nonZeroPortValuesFromConfig(conf)
+val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
+// make sure we have enough ports to allocate per offer
+ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
+  }
+
+  /**
+   * Partitions port resources.
+   *
+   * @param portsToAssign non-zero ports to assign
+   * @param offeredResources the ports offered
+   * @return resources left, port resources to be used.
+   */
+  def partitionPortResources(portsToAssign: List[Long], offeredResources: 
List[Resource])
+: (List[Resource], List[Resource]) = {
+if (portsToAssign.isEmpty) {
+  (offeredResources, List[Resource]())
+}
+// partition port offers
+val (resourcesWithoutPorts, portResources) = 
filterPortResources(offeredResources)
+val offeredPortRanges = 
getRangeResourceWithRoleInfo(portResources.asJava, "ports")
+// reserve non-zero ports
+val nonZeroResources = reservePorts(offeredPortRanges, portsToAssign)
+
+createResourcesFromAssignedPorts(nonZeroResources)
+  }
+
+  /**
+   * Returns known port name used by the executor process.
+   * @return the port name
+   */
+  def managedPortNames() : List[String] = List("spark.executor.port", 
"spark.blockManager.port")
+
+  /**
+   * The values of the non-zero ports to be used by the executor process.
+   * @param conf the spark config to use
+   * @return the ono-zero values of the ports
+   */
+  def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
+managedPortNames().map(conf.getLong(_, 0)).filter( _ != 0)
+  }
+
+  /**
+   * It gets a tuple for the non-zero port assigned resources.
+   * First member of the tuple represents resources left while the second
+   * resources used. A tuple is returned with final resources left (fist 
member)
+   * and the resources used (second member).
+   */
+  private def createResourcesFromAssignedPorts(
+  nonZero: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]))
+: (List[Resource], List[Resource]) = {
+(nonZero._1.flatMap{port => createMesosPortResource(port.range, 
Some(port.role))},
+  nonZero._2.flatMap{port => createMesosPortResource(port.range, 
Some(port.role))})
+  }
+
+  private case class PortRangeResourceInfo(role: String, range: 
List[(Long, Long)])
+
+  /**
+   * A resource can have multiple values in the offer since it can either 
be from
+   * a specific role or wildcard.
+   * Extract role info and port range for every port resource in the offer
+   */
+  private def getRangeResourceWithRoleInfo(resources: JList[Resource], 
name: String)
+: List[PortRangeResourceInfo] = {
+resources.asScala.filter(_.getName == name).
+  map{resource =>
+PortRangeResourceInfo(resource.getRole, 
resource.getRanges.getRangeList.asScala
+.map(r => (r.getBegin, r.getEnd)).toList)
+  }.toList
+  }
+
+  /** Helper method to get a pair of assigned and remaining ports along 
with role info */
+  private def reservePorts(
+  offeredPortRanges: List[PortRangeResourceInfo],
+  requestedPorts: List[Long])
+: (List[PortRangeResourceInfo], List[PortRangeResourceInfo]) = {
+var tmpLeft = offeredPortRanges
+val tmpRanges = for {port <- requestedPorts}
+  yield {
+val ret = findPortAndSplitRange(port, tmpLeft)
+val rangeToRemove = ret._1
+val diffRanges = tmpLeft.filterNot{r => r == rangeToRemove}
+val newRangesLeft = diffRanges ++ List(ret._2).flatMap(p => p)
+tmpLeft = newRangesLeft
+ret
+  }
 

  1   2   >