[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808169
  
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   throw new BlockException(key, sBlock manager failed to return 
cached value for $key!)
   }
 } else {
-  /* This RDD is to be cached in memory. In this case we cannot pass 
the computed values
+  /*
+   * This RDD is to be cached in memory. In this case we cannot pass 
the computed values
* to the BlockManager as an iterator and expect to read it back 
later. This is because
* we may end up dropping a partition from memory store before 
getting it back, e.g.
-   * when the entirety of the RDD does not fit in memory. */
-  val elements = new ArrayBuffer[Any]
-  elements ++= values
-  updatedBlocks ++= blockManager.put(key, elements, storageLevel, 
tellMaster = true)
-  elements.iterator.asInstanceOf[Iterator[T]]
+   * when the entirety of the RDD does not fit in memory.
+   *
+   * In addition, we must be careful to not unfold the entire 
partition in memory at once.
+   * Otherwise, we may cause an OOM exception if the JVM does not have 
enough space for this
+   * single partition. Instead, we unfold the values cautiously, 
potentially aborting and
+   * dropping the partition to disk if applicable.
+   */
+  blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) 
match {
+case Left(arrayValues) =
+  // We have successfully unfolded the entire partition, so cache 
it in memory
+  updatedBlocks ++= blockManager.put(key, arrayValues, 
storageLevel, tellMaster = true)
+  arrayValues.iterator.asInstanceOf[Iterator[T]]
--- End diff --

This cast is the one that is scary to me, we should try to restructure it 
so we are casting to a more general type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/993#issuecomment-48697796
  
QA tests have started for PR 993. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16557/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808250
  
--- Diff: python/pyspark/join.py ---
@@ -1,35 +1,19 @@
-
-Copyright (c) 2011, Douban Inc. http://www.douban.com/
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are
-met:
-
-* Redistributions of source code must retain the above copyright
-notice, this list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above
-copyright notice, this list of conditions and the following disclaimer
-in the documentation and/or other materials provided with the
-distribution.
-
-* Neither the name of the Douban Inc. nor the names of its
-contributors may be used to endorse or promote products derived from
-this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-AS IS AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
-OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
-LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
-DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
-THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
-(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
--- End diff --

This was discussed in https://github.com/apache/spark/pull/592 , and the 
conclusion was to keep the Douban BSD license. Also, this is irrelevant to this 
PR. So please remove this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808269
  
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   throw new BlockException(key, sBlock manager failed to return 
cached value for $key!)
   }
 } else {
-  /* This RDD is to be cached in memory. In this case we cannot pass 
the computed values
+  /*
+   * This RDD is to be cached in memory. In this case we cannot pass 
the computed values
* to the BlockManager as an iterator and expect to read it back 
later. This is because
* we may end up dropping a partition from memory store before 
getting it back, e.g.
-   * when the entirety of the RDD does not fit in memory. */
-  val elements = new ArrayBuffer[Any]
-  elements ++= values
-  updatedBlocks ++= blockManager.put(key, elements, storageLevel, 
tellMaster = true)
-  elements.iterator.asInstanceOf[Iterator[T]]
+   * when the entirety of the RDD does not fit in memory.
+   *
+   * In addition, we must be careful to not unfold the entire 
partition in memory at once.
+   * Otherwise, we may cause an OOM exception if the JVM does not have 
enough space for this
+   * single partition. Instead, we unfold the values cautiously, 
potentially aborting and
+   * dropping the partition to disk if applicable.
+   */
+  blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) 
match {
+case Left(arrayValues) =
+  // We have successfully unfolded the entire partition, so cache 
it in memory
+  updatedBlocks ++= blockManager.put(key, arrayValues, 
storageLevel, tellMaster = true)
+  arrayValues.iterator.asInstanceOf[Iterator[T]]
+case Right(iteratorValues) =
+  // There is not enough space to cache this partition in memory
+  logWarning(sNot enough space to cache $key in memory!  +
+sFree memory is ${blockManager.memoryStore.freeMemory}B.)
+  var returnValues = iteratorValues.asInstanceOf[Iterator[T]]
+  if (storageLevel.useDisk) {
+logWarning(sPersisting $key to disk instead.)
+val newLevel = StorageLevel(
--- End diff --

Could this create a sort of strange situation where I persist an RDD with 
storage level MEMORY_AND_DISK but then I get back listener updates where 
individual blocks come back with StorageLevel MEMORY_ONLY? This seems like it 
could be really confusing for downstream consumers of the StorageLevel of a 
particular RDD... do we do this anywhere else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808344
  
--- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
@@ -67,10 +67,14 @@ class SparkEnv (
 val metricsSystem: MetricsSystem,
 val conf: SparkConf) extends Logging {
 
-  // A mapping of thread ID to amount of memory used for shuffle in bytes
+  // A mapping of thread ID to amount of memory, in bytes, used for shuffle
   // All accesses should be manually synchronized
   val shuffleMemoryMap = mutable.HashMap[Long, Long]()
 
+  // A mapping of thread ID to amount of memory, in bytes, used for 
unrolling an RDD partition
+  // All accesses should be manually synchronized
+  val cacheMemoryMap = mutable.HashMap[Long, Long]()
--- End diff --

Could we call this `partitionMemoryMap`? Otherwise it could seem like this 
actually represents home of the cached data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808675
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   *
+   * Methods currently supported: pearson (default), spearman
--- End diff --

Follow ScalaDoc for methods. It is worth noting the cost for spearman.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808684
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
--- End diff --

Trait for correlation algorithms.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808671
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+object Statistics {
--- End diff --

Need doc for public classes. Please mark it as @Experimental for the 
initial version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808679
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import org.apache.spark.mllib.linalg.{Matrix, Vector}
+import org.apache.spark.mllib.stat.correlation.Correlations
+import org.apache.spark.rdd.RDD
+
+object Statistics {
+
+  /**
+   * Compute the Pearson correlation matrix for the input RDD of Vectors.
+   */
+  def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X)
+
+  /**
+   * Compute the correlation matrix for the input RDD of Vectors using the 
specified method.
+   *
+   * Methods currently supported: pearson (default), spearman
+   */
+  def corr(X: RDD[Vector], method: String): Matrix = 
Correlations.corrMatrix(X, method)
+
+  /**
+   * Compute the Pearson correlation for the input RDDs.
+   */
+  def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, 
y)
+
+  /**
+   * Compute the correlation for the input RDDs using the specified method.
+   *
+   * Methods currently supported: pearson (default), spearman
+   */
+  def corr(x: RDD[Double], y: RDD[Double], method: String): Double = 
Correlations.corr(x, y, method)
+
--- End diff --

remove empty lines


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808688
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
--- End diff --

`private[stat]`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808707
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =
+  iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))}
--- End diff --

spaces around `case` and add one space before `}`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808705
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =
--- End diff --

`({` - ` {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808714
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =
+  iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. P for pearson, as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * spearmansrho will be matched to spearman)
+ *
+ * Maintains the default correlation type, pearson
+ */
+object Correlations {
--- End diff --

Change to `Correlation` and mark it `private[stat]`. We used `Vectors` 
because `scala.Vector` is imported by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: hijack hash to make hash of None consistant cr...

2014-07-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1371#issuecomment-48699285
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808775
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -463,16 +463,16 @@ private[spark] class BlockManager(
   val values = dataDeserialize(blockId, bytes)
   if (level.deserialized) {
 // Cache the values before returning them
-// TODO: Consider creating a putValues that also takes in 
a iterator?
-val valuesBuffer = new ArrayBuffer[Any]
-valuesBuffer ++= values
-memoryStore.putValues(blockId, valuesBuffer, level, 
returnValues = true).data
-  match {
-case Left(values2) =
-  return Some(new BlockResult(values2, 
DataReadMethod.Disk, info.size))
-case _ =
-  throw new SparkException(Memory store did not 
return back an iterator)
-  }
+val putResult = memoryStore.putValues(blockId, values, 
level, returnValues = true)
+putResult.data match {
+  case Left(it) =
+return Some(new BlockResult(it, DataReadMethod.Disk, 
info.size))
+  case Right(b) =
+return Some(new BlockResult(
+  dataDeserialize(blockId, b),
--- End diff --

We should talk more about what's going on here tomorrow. I read this 
function for 15 minutes and couldn't figure out what was going on. For one 
thing though, AFAIK we are only at this branch if we've read a block from disk. 
Will we just over-write the existing on-disk block again in this scenario. 
Also, is there any reason to return `dataDeserailize(blocId, b)` here instead 
of just returning `values`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808809
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -87,9 +97,32 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   values: Iterator[Any],
   level: StorageLevel,
   returnValues: Boolean): PutResult = {
-val valueEntries = new ArrayBuffer[Any]()
-valueEntries ++= values
-putValues(blockId, valueEntries, level, returnValues)
+val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+val unfoldedValues = unfoldSafely(blockId, values, droppedBlocks)
+unfoldedValues match {
+  case Left(arrayValues) =
+// Values are fully unfolded in memory, so store them as an array
+val result = putValues(blockId, arrayValues, level, returnValues)
+droppedBlocks ++= result.droppedBlocks
+PutResult(result.size, result.data, droppedBlocks)
+  case Right(iteratorValues) =
+// Not enough space to unfold this block; drop to disk if 
applicable
+logWarning(sNot enough space to store $blockId in memory! Free 
memory is ${freeMemory}B.)
+if (level.useDisk) {
+  logWarning(sPersisting $blockId to disk instead.)
+  val newLevel = StorageLevel(
+useDisk = true,
+useMemory = false,
+useOffHeap = false,
+deserialized = false,
+level.replication)
+  val result = blockManager.diskStore.putValues(
--- End diff --

Should we check if the diskStore already contains the value here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808804
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =
+  iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. P for pearson, as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

This feature actually makes it error-prone to add new correlations. For 
example, I use `corr(x, y, p)` in my code and in v1.2 someone adds another 
correlation starting with p. Then it breaks existing code. I saw R implements 
this feature but it simply limits the methods to pearson, spearman, and 
kendall. If they plan to add new correlation with prefix collision, they will 
face the same problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808829
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
+var previousSize = 0L   // Previous estimate of the size of 
our buffer
+val memoryRequestPeriod = 1000  // How frequently we request for more 
memory for our buffer
--- End diff --

Maybe this should be `memoryRequestInterval`? To me period implies time 
and frequency implies 1/time, but this is neither.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808850
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
+var previousSize = 0L   // Previous estimate of the size of 
our buffer
+val memoryRequestPeriod = 1000  // How frequently we request for more 
memory for our buffer
+
+val threadId = Thread.currentThread().getId
+val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
+var buffer = new SizeTrackingAppendOnlyBuffer[Any]
+
+try {
+  while (values.hasNext  enoughMemory) {
+buffer += values.next()
+count += 1
+if (count % memoryRequestPeriod == 1) {
+  // Calculate the amount of memory to request from the global 
memory pool
+  val currentSize = buffer.estimateSize()
+  val delta = if (previousSize  0) math.max(currentSize - 
previousSize, 0) else 0
+  val memoryToRequest = currentSize + delta
+  previousSize = currentSize
+
+  // Atomically check whether there is sufficient memory in the 
global pool to continue
+  cacheMemoryMap.synchronized {
+val previouslyOccupiedMemory = 
cacheMemoryMap.get(threadId).getOrElse(0L)
+val otherThreadsMemory = cacheMemoryMap.values.sum - 
previouslyOccupiedMemory
+
+// Request for memory for the local buffer, and return whether 
request is granted
--- End diff --

Thing it's proper not to use a comma here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread mengxr
Github user mengxr commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14808860
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =
+  iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. P for pearson, as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * spearmansrho will be matched to spearman)
--- End diff --

This feature is not necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808921
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
+var previousSize = 0L   // Previous estimate of the size of 
our buffer
+val memoryRequestPeriod = 1000  // How frequently we request for more 
memory for our buffer
+
+val threadId = Thread.currentThread().getId
+val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
+var buffer = new SizeTrackingAppendOnlyBuffer[Any]
+
+try {
+  while (values.hasNext  enoughMemory) {
+buffer += values.next()
+count += 1
+if (count % memoryRequestPeriod == 1) {
+  // Calculate the amount of memory to request from the global 
memory pool
+  val currentSize = buffer.estimateSize()
+  val delta = if (previousSize  0) math.max(currentSize - 
previousSize, 0) else 0
+  val memoryToRequest = currentSize + delta
+  previousSize = currentSize
+
+  // Atomically check whether there is sufficient memory in the 
global pool to continue
+  cacheMemoryMap.synchronized {
+val previouslyOccupiedMemory = 
cacheMemoryMap.get(threadId).getOrElse(0L)
+val otherThreadsMemory = cacheMemoryMap.values.sum - 
previouslyOccupiedMemory
+
+// Request for memory for the local buffer, and return whether 
request is granted
+def requestForMemory(): Boolean = {
--- End diff --

Why is this it's own function? Why not just do this:

```
val availableMemory = freeMemory - otherThreadsMemory
val granted = availableMemory  memoryToRequest

if (granted) {
 cacheMemoryMap(threadId) = memoryToRequest
} else {
 val result = ensureFreeSpace(blockId, globalBufferMemory)
 droppedBlocks ++= result.droppedBlocks
 enoughMemory = requestForMemory()
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14808944
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
+var previousSize = 0L   // Previous estimate of the size of 
our buffer
+val memoryRequestPeriod = 1000  // How frequently we request for more 
memory for our buffer
+
+val threadId = Thread.currentThread().getId
+val cacheMemoryMap = SparkEnv.get.cacheMemoryMap
+var buffer = new SizeTrackingAppendOnlyBuffer[Any]
+
+try {
+  while (values.hasNext  enoughMemory) {
+buffer += values.next()
+count += 1
+if (count % memoryRequestPeriod == 1) {
+  // Calculate the amount of memory to request from the global 
memory pool
+  val currentSize = buffer.estimateSize()
+  val delta = if (previousSize  0) math.max(currentSize - 
previousSize, 0) else 0
+  val memoryToRequest = currentSize + delta
+  previousSize = currentSize
+
+  // Atomically check whether there is sufficient memory in the 
global pool to continue
+  cacheMemoryMap.synchronized {
+val previouslyOccupiedMemory = 
cacheMemoryMap.get(threadId).getOrElse(0L)
+val otherThreadsMemory = cacheMemoryMap.values.sum - 
previouslyOccupiedMemory
+
+// Request for memory for the local buffer, and return whether 
request is granted
+def requestForMemory(): Boolean = {
--- End diff --

Might be sufficient to just name this `requestMemory`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...

2014-07-11 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/1354#discussion_r14809046
  
--- Diff: python/pyspark/rdd.py ---
@@ -509,7 +522,8 @@ def sortByKey(self, ascending=True, numPartitions=None, 
keyfunc = lambda x: x):
  tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), 
('lamb', 5)]
  tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), 
('white', 9)])
  sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: 
k.lower()).collect()
-[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), 
('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
+[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), 
('Mary', 1), \
+('was', 8), ('white', 9), ('whose', 6)]
--- End diff --

is this supposed to be like this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2119][SQL] Improved Parquet performance...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1370#issuecomment-48700541
  
QA results for PR 1370:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16555/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14809203
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
+var previousSize = 0L   // Previous estimate of the size of 
our buffer
+val memoryRequestPeriod = 1000  // How frequently we request for more 
memory for our buffer
--- End diff --

One case I'm concerned about is need to spill is if someone has very large 
individual objects (e.g. they do a groupByKey(X).cache() with a big key). It 
might make sense to have some bi-modal thing where we sample every element for 
the first 100 elements and then we smooth it out later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1969][MLlib] Public available online su...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/955#issuecomment-48701090
  
QA tests have started for PR 955. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16558/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1165#issuecomment-48701135
  
Did an initial pass with some feedback - not totally done yet but it should 
be enough to get some work done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1369#issuecomment-48701944
  
QA results for PR 1369:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16556/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1969][MLlib] Public available online su...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/955#issuecomment-48702465
  
QA tests have started for PR 955. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16561/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14809771
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -463,16 +463,16 @@ private[spark] class BlockManager(
   val values = dataDeserialize(blockId, bytes)
   if (level.deserialized) {
 // Cache the values before returning them
-// TODO: Consider creating a putValues that also takes in 
a iterator?
-val valuesBuffer = new ArrayBuffer[Any]
-valuesBuffer ++= values
-memoryStore.putValues(blockId, valuesBuffer, level, 
returnValues = true).data
-  match {
-case Left(values2) =
-  return Some(new BlockResult(values2, 
DataReadMethod.Disk, info.size))
-case _ =
-  throw new SparkException(Memory store did not 
return back an iterator)
-  }
+val putResult = memoryStore.putValues(blockId, values, 
level, returnValues = true)
+putResult.data match {
+  case Left(it) =
+return Some(new BlockResult(it, DataReadMethod.Disk, 
info.size))
+  case Right(b) =
+return Some(new BlockResult(
+  dataDeserialize(blockId, b),
--- End diff --

You can't just return `values` because `memoryStore.putValues` already 
exhausted the iterator. This is one of those places where we `put` the values 
as an iterator somewhere and read them back in order to return them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14809809
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
--- End diff --

Ok. I had it named exactly that earlier, but it feels like if `outOfMemory` 
is true then it is already too late... I can change it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14809836
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -87,9 +97,32 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   values: Iterator[Any],
   level: StorageLevel,
   returnValues: Boolean): PutResult = {
-val valueEntries = new ArrayBuffer[Any]()
-valueEntries ++= values
-putValues(blockId, valueEntries, level, returnValues)
+val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+val unfoldedValues = unfoldSafely(blockId, values, droppedBlocks)
+unfoldedValues match {
+  case Left(arrayValues) =
+// Values are fully unfolded in memory, so store them as an array
+val result = putValues(blockId, arrayValues, level, returnValues)
+droppedBlocks ++= result.droppedBlocks
+PutResult(result.size, result.data, droppedBlocks)
+  case Right(iteratorValues) =
+// Not enough space to unfold this block; drop to disk if 
applicable
+logWarning(sNot enough space to store $blockId in memory! Free 
memory is ${freeMemory}B.)
+if (level.useDisk) {
+  logWarning(sPersisting $blockId to disk instead.)
+  val newLevel = StorageLevel(
+useDisk = true,
+useMemory = false,
+useOffHeap = false,
+deserialized = false,
+level.replication)
+  val result = blockManager.diskStore.putValues(
--- End diff --

If there is, do we kick out the old one?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14809878
  
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   throw new BlockException(key, sBlock manager failed to return 
cached value for $key!)
   }
 } else {
-  /* This RDD is to be cached in memory. In this case we cannot pass 
the computed values
+  /*
+   * This RDD is to be cached in memory. In this case we cannot pass 
the computed values
* to the BlockManager as an iterator and expect to read it back 
later. This is because
* we may end up dropping a partition from memory store before 
getting it back, e.g.
-   * when the entirety of the RDD does not fit in memory. */
-  val elements = new ArrayBuffer[Any]
-  elements ++= values
-  updatedBlocks ++= blockManager.put(key, elements, storageLevel, 
tellMaster = true)
-  elements.iterator.asInstanceOf[Iterator[T]]
+   * when the entirety of the RDD does not fit in memory.
+   *
+   * In addition, we must be careful to not unfold the entire 
partition in memory at once.
+   * Otherwise, we may cause an OOM exception if the JVM does not have 
enough space for this
+   * single partition. Instead, we unfold the values cautiously, 
potentially aborting and
+   * dropping the partition to disk if applicable.
+   */
+  blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) 
match {
+case Left(arrayValues) =
+  // We have successfully unfolded the entire partition, so cache 
it in memory
+  updatedBlocks ++= blockManager.put(key, arrayValues, 
storageLevel, tellMaster = true)
+  arrayValues.iterator.asInstanceOf[Iterator[T]]
+case Right(iteratorValues) =
+  // There is not enough space to cache this partition in memory
+  logWarning(sNot enough space to cache $key in memory!  +
+sFree memory is ${blockManager.memoryStore.freeMemory}B.)
+  var returnValues = iteratorValues.asInstanceOf[Iterator[T]]
+  if (storageLevel.useDisk) {
+logWarning(sPersisting $key to disk instead.)
+val newLevel = StorageLevel(
--- End diff --

I see. Looks like we do this all over the place in the storage page of the 
UI, but not in BlockManager itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14809944
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
--- End diff --

Ok. I find it scary too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14809936
  
--- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala ---
@@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: 
BlockManager) extends Logging {
   throw new BlockException(key, sBlock manager failed to return 
cached value for $key!)
   }
 } else {
-  /* This RDD is to be cached in memory. In this case we cannot pass 
the computed values
+  /*
+   * This RDD is to be cached in memory. In this case we cannot pass 
the computed values
* to the BlockManager as an iterator and expect to read it back 
later. This is because
* we may end up dropping a partition from memory store before 
getting it back, e.g.
-   * when the entirety of the RDD does not fit in memory. */
-  val elements = new ArrayBuffer[Any]
-  elements ++= values
-  updatedBlocks ++= blockManager.put(key, elements, storageLevel, 
tellMaster = true)
-  elements.iterator.asInstanceOf[Iterator[T]]
+   * when the entirety of the RDD does not fit in memory.
+   *
+   * In addition, we must be careful to not unfold the entire 
partition in memory at once.
+   * Otherwise, we may cause an OOM exception if the JVM does not have 
enough space for this
+   * single partition. Instead, we unfold the values cautiously, 
potentially aborting and
+   * dropping the partition to disk if applicable.
+   */
+  blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) 
match {
+case Left(arrayValues) =
+  // We have successfully unfolded the entire partition, so cache 
it in memory
+  updatedBlocks ++= blockManager.put(key, arrayValues, 
storageLevel, tellMaster = true)
+  arrayValues.iterator.asInstanceOf[Iterator[T]]
--- End diff --

I agree. We could add a type parameter to `unfoldSafely`, since any one 
block must have values of the same type. Note that even in the existing code we 
already cast it to `Iterator[T]` all over the place.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread andrewor14
Github user andrewor14 commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14810031
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.util.SizeEstimator
+
+/**
+ * A general interface for collections to keep track of their estimated 
sizes in bytes.
+ * We sample with a slow exponential back-off using the SizeEstimator to 
amortize the time,
+ * as each call to SizeEstimator is somewhat expensive (order of a few 
milliseconds).
+ */
+private[spark] trait SizeTracker {
+
+  import SizeTracker._
+
+  /**
+   * Controls the base of the exponential which governs the rate of 
sampling.
+   * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements.
+   */
+  private val SAMPLE_GROWTH_RATE = 1.1
+
+  /** Samples taken since last resetSamples(). Only the last two are kept 
for extrapolation. */
+  private val samples = new ArrayBuffer[Sample]
+
+  /** The average number of bytes per update between our last two samples. 
*/
+  private var bytesPerUpdate: Double = _
+
+  /** Total number of insertions and updates into the map since the last 
resetSamples(). */
+  private var numUpdates: Long = _
+
+  /** The value of 'numUpdates' at which we will take our next sample. */
+  private var nextSampleNum: Long = _
+
+  resetSamples()
+
+  /**
+   * Reset samples collected so far.
+   * This should be called after the collection undergoes a dramatic 
change in size.
+   */
+  protected def resetSamples(): Unit = {
+numUpdates = 1
+nextSampleNum = 1
+samples.clear()
+takeSample()
+  }
+
+  /**
+   * Callback to be invoked after every update.
+   */
+  protected def afterUpdate(): Unit = {
+numUpdates += 1
+if (nextSampleNum == numUpdates) {
+  takeSample()
+}
+  }
+
+  /**
+   * Take a new sample of the current collection's size.
+   */
+  private def takeSample(): Unit = {
+samples += Sample(SizeEstimator.estimate(this), numUpdates)
+// Only use the last two samples to extrapolate
+if (samples.size  2) {
+  samples.remove(0)
+}
+val bytesDelta = samples.toSeq.reverse match {
+  case latest :: previous :: tail =
+(latest.size - previous.size).toDouble / (latest.numUpdates - 
previous.numUpdates)
+  // If fewer than 2 samples, assume no change
+  case _ = 0
+}
+bytesPerUpdate = math.max(0, bytesDelta)
+nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong
+  }
+
+  /**
+   * Estimate the current size of the collection in bytes. O(1) time.
+   */
+  def estimateSize(): Long = {
+assert(samples.nonEmpty)
--- End diff --

First, because `samples` can never be empty (`resetSamples` always takes at 
least one sample). Second, having 0 samples does not mean we have no data in 
the underlying collection; it just means we don't have any information about 
the current size.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...

2014-07-11 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/931#issuecomment-48703798
  
Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Use the Executor's ClassLoader in sc.objectFil...

2014-07-11 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/181#issuecomment-48704142
  
Cool - thanks for contributing the test! Jenkins, test this please. LGTM 
pending tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Use the Executor's ClassLoader in sc.objectFil...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/181#issuecomment-48704291
  
QA tests have started for PR 181. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16563/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...

2014-07-11 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/1354#discussion_r14810364
  
--- Diff: python/pyspark/rdd.py ---
@@ -509,7 +522,8 @@ def sortByKey(self, ascending=True, numPartitions=None, 
keyfunc = lambda x: x):
  tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), 
('lamb', 5)]
  tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), 
('white', 9)])
  sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: 
k.lower()).collect()
-[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), 
('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
+[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), 
('Mary', 1), \
+('was', 8), ('white', 9), ('whose', 6)]
--- End diff --

Pep8 does not like very long lines. Other option was to change the test
case. I am not sure if we can ask it to ignore one line in comments.
On Jul 11, 2014 12:35 PM, Reynold Xin notificati...@github.com wrote:

 In python/pyspark/rdd.py:

  @@ -509,7 +522,8 @@ def sortByKey(self, ascending=True, 
numPartitions=None, keyfunc = lambda x: x):
tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), 
('lamb', 5)]
tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), 
('white', 9)])
sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: 
k.lower()).collect()
  -[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 
4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
  +[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 
4), ('Mary', 1), \
  +('was', 8), ('white', 9), ('whose', 6)]

 is this supposed to be like this?

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1354/files#r14809046.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: mesos executor ids now consist of the slave id...

2014-07-11 Thread drexin
Github user drexin commented on the pull request:

https://github.com/apache/spark/pull/1358#issuecomment-48706534
  
Hi Patrick,

the problem is described in [this mailing list 
entry](http://mail-archives.apache.org/mod_mbox/mesos-user/201407.mbox/%3c53b66e6d.7090...@uninett.no%3e)

If I understand the [documentation on run 
modes](http://spark.apache.org/docs/latest/running-on-mesos.html) and the code 
correctly, in fine grained mode it starts a separate instance of 
`MesosExecutorBackend` for each spark task. If this is correct, then as soon as 
2 tasks run concurrently on the same machine we should run into this problem.

On [this 
line](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala#L329)
 in the `BlockManagerMasterActor`, there is a check on the `BlockManagerId`, 
which will always be different per `Executor` instance, because the port in 
there is randomly assigned. The `executorId` however is always set to the mesos 
`slaveId`. This means that we are running into [this 
case](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala#L331-L335)
 as soon as we start two `Executor` instances on the same slave. This PR fixes 
this by adding the counter to the `executorId`. Please tell me if I overlooked 
something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1369#issuecomment-48706567
  
QA tests have started for PR 1369. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16564/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48707013
  
Thanks Michael ,
(1) We could make it as a user hint ,like hive does .
set hive.optimize.skewjoin = true; 
set hive.skewjoin.key = skew_key_threshold (default = 10)
We could use  set sparksql.optimize.skewjoin=true
set sparksql.skewjoin.key=skew_key_threshold
(2)We could use sample to found the relative num of the key and though 
skew_key_threshold which is user set can judge which key is over the threshold
(3) toString will generate many singleton object .
,I will optimize the code in next step.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...

2014-07-11 Thread srowen
Github user srowen commented on the pull request:

https://github.com/apache/spark/pull/1369#issuecomment-48707227
  
Wasn't this already decided against in 
https://github.com/apache/spark/pull/332 and again 
https://github.com/apache/spark/pull/1208 ? or is this not another PR for 
https://issues.apache.org/jira/browse/SPARK-1470 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: mesos executor ids now consist of the slave id...

2014-07-11 Thread drexin
Github user drexin commented on the pull request:

https://github.com/apache/spark/pull/1358#issuecomment-48707209
  
Created a JIRA issue here: https://issues.apache.org/jira/browse/SPARK-2445


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48707289
  
Hi , I also make a left semi join .I don't know is this join as a 
optimization as  the left semi join or as a single join algorithm. I think  the 
1127 PR also has some optimization need to do .Do you think this 1127 PR has it 
value to be merged ?Thanks a lot. 
https://github.com/apache/spark/pull/1127


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on a diff in the pull request:

https://github.com/apache/spark/pull/1134#discussion_r14811559
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
@@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin(
   streamedPlusMatches.flatMap(_._1), 
sqlContext.sparkContext.makeRDD(rightOuterMatches))
   }
 }
+
+/**
+ * :: DeveloperApi ::
+ * In some case ,data skew happens.SkewJoin  sample the table rdd to find 
the largest key,
+ * then make the largest key rows as a table rdd.The left rdd will be made 
 leftSkewedtable
+ * rdd without the largest key and the maxkeyskewedtable rdd with the 
largest key.
+ *  Then,join the two table  with the righttable.
+ * Finally,union the two result rdd.
+ */
+@DeveloperApi
+case class SkewJoinCartesianProduct(
+left: SparkPlan,
+right: SparkPlan,
+condition: Option[Expression])(@transient sc: SparkContext) extends 
BinaryNode {
+  override def output = left.output ++ right.output
+
+  @transient lazy val boundCondition =
+InterpretedPredicate(
+  condition
+  .map(c = BindReferences.bindReference(c, left.output ++ 
right.output))
+  .getOrElse(Literal(true)))
+
+  def execute() = {
+
+val skewedTable = left.execute()
+//This will later write as configuration
+val sample = skewedTable.sample(false, 0.3, 9).collect()
+val sortedSample = sample.sortWith((row1, row2) = row1.hashCode()  
row2.hashCode())
--- End diff --

i want to use key to sort the row ,I think i need some better way to obtain 
the key .Do you have some better way to fetch the key?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1969][MLlib] Online summarizer APIs for...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/955#issuecomment-48708106
  
QA results for PR 955:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with Serializable {brbrFor more information 
see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16558/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...

2014-07-11 Thread witgo
Github user witgo commented on the pull request:

https://github.com/apache/spark/pull/1369#issuecomment-48708268
  
#332 can't automatic test .
#1208  was messing up and I do not know how to solve . :sweat: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: replace println to log4j

2014-07-11 Thread fireflyc
GitHub user fireflyc opened a pull request:

https://github.com/apache/spark/pull/1372

replace println to log4j

Our program needs to receive a large amount of data and run for a long
time.
We set the log level to WARN but Storing iterator received single
as such message written to the log file. (over yarn)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fireflyc/spark fix-replace-stdout-log

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1372.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1372


commit fa22a3879a9d4887cd229966b6523dd65b2f6003
Author: fireflyc firef...@126.com
Date:   2014-07-11T08:03:20Z

replace println to log4j

Our program needs to receive a large amount of data and run for a long
time.
We set the log level to WARN but Storing iterator received single
as such message written to the log file. (over yarn)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1470] Use the scala-logging wrapper ins...

2014-07-11 Thread witgo
Github user witgo commented on the pull request:

https://github.com/apache/spark/pull/332#issuecomment-48708675
  
It can't automatic test. I submit a new PR #1369. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: replace println to log4j

2014-07-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1372#issuecomment-48708875
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1969][MLlib] Online summarizer APIs for...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/955#issuecomment-48709108
  
QA results for PR 955:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with Serializable {brbrFor more information 
see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16560/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1969][MLlib] Online summarizer APIs for...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/955#issuecomment-48709925
  
QA results for PR 955:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass MultivariateOnlineSummarizer extends 
MultivariateStatisticalSummary with Serializable {brbrFor more information 
see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16561/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...

2014-07-11 Thread ueshin
GitHub user ueshin opened a pull request:

https://github.com/apache/spark/pull/1373

[SPARK-2446][SQL] Add BinaryType support to Parquet I/O.

To support `BinaryType`, the following changes are needed:
- Make `StringType` use `OriginalType.UTF8`
- Add `BinaryType` using `PrimitiveTypeName.BINARY` without `OriginalType`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ueshin/apache-spark issues/SPARK-2446

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1373.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1373


commit 616e04a989edce9f5109a3b2ad11b78e1e0f1a77
Author: Takuya UESHIN ues...@happy-camper.st
Date:   2014-07-09T15:25:08Z

Make StringType use OriginalType.UTF8.

commit ecacb925c4e1f2dbf863b604ad2600cc8c9663d8
Author: Takuya UESHIN ues...@happy-camper.st
Date:   2014-07-11T09:13:46Z

Add BinaryType support to Parquet I/O.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1373#issuecomment-48710936
  
QA tests have started for PR 1373. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16565/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread li-zhihui
Github user li-zhihui commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14813843
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 {
   // Use an atomic variable to track total number of cores in the cluster 
for simplicity and speed
   var totalCoreCount = new AtomicInteger(0)
+  var totalExpectedExecutors = new AtomicInteger(0)
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
+  // Submit tasks only after (registered executors / total expected 
executors) 
+  // is equal to at least this value, that is double between 0 and 1.
+  var minRegisteredRatio = 
conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0)
--- End diff --

@tgravescs Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread li-zhihui
Github user li-zhihui commented on the pull request:

https://github.com/apache/spark/pull/900#issuecomment-48714143
  
Thanks @tgravescs 
I will file a new jira for handling mesos and follow it after the PR merged.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1155#issuecomment-48714186
  
QA tests have started for PR 1155. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16566/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1470,SPARK-1842] Use the scala-logging ...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1369#issuecomment-48714364
  
QA results for PR 1369:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16564/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...

2014-07-11 Thread avulanov
Github user avulanov commented on a diff in the pull request:

https://github.com/apache/spark/pull/1155#discussion_r14814439
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala 
---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.evaluation
+
+import org.apache.spark.Logging
+import org.apache.spark.SparkContext._
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+
+import scala.collection.Map
--- End diff --

RDD functions return scala.collection.Map in various methods that I use: 
countByValue, collectAsMap etc. Returning scala.collection.Map is not 
consistent with Scala predef for Map which is collection.immutable.Map and 
the compiler returns type mismatch error if I remove this import.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...

2014-07-11 Thread avulanov
Github user avulanov commented on the pull request:

https://github.com/apache/spark/pull/1155#issuecomment-48715008
  
@mengxr 
I've addressed you comments, except the one with import which I commented 
above.

I've posted a question about feature selection interface. Could you look at 
it ?

http://apache-spark-developers-list.1001551.n3.nabble.com/Feature-selection-interface-td7258.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1373#issuecomment-48718560
  
QA results for PR 1373:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16565/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1155#issuecomment-48720849
  
QA results for PR 1155:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds the following public classes 
(experimental):brclass MulticlassMetrics(predictionAndLabels: RDD[(Double, 
Double)]) {br* (equals to precision for multiclass classifierbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16566/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1374#issuecomment-48722841
  
QA results for PR 1374:br- This patch FAILED unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16567/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1354#issuecomment-48725049
  
QA results for PR 1354:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16568/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...

2014-07-11 Thread ScrapCodes
Github user ScrapCodes commented on the pull request:

https://github.com/apache/spark/pull/1374#issuecomment-48725337
  
Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1374#issuecomment-48725500
  
QA tests have started for PR 1374. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16569/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/1279#discussion_r14821748
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -124,6 +124,14 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 set(spark.home, home)
   }
 
+  /**
+   * Set the max number of submission retries the Spark client will attempt
+   * before giving up
+   */
--- End diff --

We haven't been adding specific routines to set the configs.  The user can 
just set it using the existing SparkConf.set routines so I think we should 
remove this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/1279#discussion_r14822032
  
--- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala ---
@@ -167,6 +175,8 @@ class SparkConf(loadDefaults: Boolean) extends 
Cloneable with Logging {
 getOption(key).map(_.toInt).getOrElse(defaultValue)
   }
 
+  def getIntOption(key: String): Option[Int] = getOption(key).map(_.toInt)
--- End diff --

To keep things consistent (these api's are public) I don't think we should 
add the getIntOption without adding other routines like getLongOption, etc.   
For now can you just use getOption and then make it an Int.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/1279#discussion_r14822125
  
--- Diff: 
yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -108,6 +108,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
 val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
 appContext.setApplicationId(appId)
 appContext.setApplicationName(args.appName)
+sparkConf.getIntOption(spark.maxappattempts) match {
+  case Some(v) = appContext.setMaxAppAttempts(v)
--- End diff --

hadoop 0.23 (yarn alpha) doesn't have a setMaxAppAttempts routine.  Just 
remove this and only do it in the yarn stable version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/1279#discussion_r14822197
  
--- Diff: 
yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -81,6 +81,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: 
Configuration, spConf: Spa
 appContext.setQueue(args.amQueue)
 appContext.setAMContainerSpec(amContainer)
 appContext.setApplicationType(SPARK)
+sparkConf.getIntOption(spark.maxappattempts) match {
+  case Some(v) = appContext.setMaxAppAttempts(v)
+  case None = logDebug(Not setting max app attempts.)
--- End diff --

Can you add something like cluster default setting will be used to the 
log statement?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1374#issuecomment-48736229
  
QA results for PR 1374:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16569/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14823260
  
--- Diff: docs/configuration.md ---
@@ -699,6 +699,22 @@ Apart from these, the following properties are also 
available, and may be useful
 (in milliseconds)
   /td
 /tr
+/tr
+  tdcodespark.scheduler.minRegisteredExecutorsRatio/code/td
+  td0/td
+  td
+Submit tasks only after (registered executors / total expected 
executors)
+is equal to at least this value, which is double between 0 and 1.
+  /td
+/tr
+tr
+  tdcodespark.scheduler.maxRegisteredExecutorsWaitingTime/code/td
+  td3/td
+  td
+Whatever (registered executors / total expected executors) is reached 
--- End diff --

I think we should clarify both of these a bit because its really you start 
when either one is hit so I think adding reference to 
maxRegisteredExecutorsWaitingTime from the description of 
minRegisteredExecutorsRatio would be good.   

How about something like below?  Note I'm not a doc writer so I'm fine with 
changing.

for spark.scheduler.minRegisteredExecutorsRatio:
The minimum ratio of registered executors (registered executors / total 
expected executors) to wait for before scheduling begins. Specified as a double 
between 0 and 1. Regardless of whether the minimum ratio of executors has been 
reached, the maximum amount of time it will wait before scheduling begins is 
controlled by config 
codespark.scheduler.maxRegisteredExecutorsWaitingTime/code . 

Then for spark.scheduler.maxRegisteredExecutorsWaitingTime:
Maximum amount of time to wait for executors to register before scheduling 
begins (in milliseconds). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: replace println to log4j

2014-07-11 Thread fireflyc
Github user fireflyc commented on the pull request:

https://github.com/apache/spark/pull/1372#issuecomment-48739929
  
I have verified, the log level is set to Info right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...

2014-07-11 Thread ScrapCodes
Github user ScrapCodes commented on the pull request:

https://github.com/apache/spark/pull/1354#issuecomment-48740596
  
Found a way actually.
On Jul 11, 2014 6:07 PM, Apache Spark QA notificati...@github.com wrote:

 QA results for PR 1354:
 - This patch PASSES unit tests.
 - This patch merges cleanly
 - This patch adds no public classes

 For more information see test ouptut:

 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16568/consoleFull

 —
 Reply to this email directly or view it on GitHub
 https://github.com/apache/spark/pull/1354#issuecomment-48725049.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...

2014-07-11 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/900#discussion_r14825405
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,6 +257,17 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, actorSystem: A
 throw new SparkException(Error notifying standalone scheduler's 
driver actor, e)
 }
   }
+
+  override def isReady(): Boolean = {
+if (ready) {
+  return true
+}
+if ((System.currentTimeMillis() - createTime) = 
maxRegisteredWaitingTime) {
--- End diff --

it might be nice to have a log statement here saying max time hit so we 
know when the scheduling began if debugging a job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1719: spark.*.extraLibraryPath isn't app...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1022#issuecomment-48750411
  
QA tests have started for PR 1022. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16570/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin

2014-07-11 Thread YanjieGao
Github user YanjieGao commented on the pull request:

https://github.com/apache/spark/pull/1134#issuecomment-48752406
  
Hi I rewrite the code ,and resolve some former problem


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2441][SQL] Add more efficient distinct ...

2014-07-11 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1366#issuecomment-48759522
  
@aarondav, you are totally right.  However, the `Aggregate` operator that 
this is replacing made the same assumption and this approach will use strictly 
less memory. 

For the 1.2 release I'd like to focus on external algorithms and hopefully 
start using more of sparks built in data structures for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...

2014-07-11 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/1373#issuecomment-48760249
  
Thanks for the patch!  One quick question: will this change the behavior 
when loading in string data that was saved with previous versions of Spark SQL?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1719: spark.*.extraLibraryPath isn't app...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1022#issuecomment-48761837
  
QA results for PR 1022:br- This patch PASSES unit tests.br- This patch 
merges cleanlybr- This patch adds no public classesbrbrFor more 
information see test 
ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16570/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1177] Allow SPARK_JAR to be set program...

2014-07-11 Thread dbtsai
Github user dbtsai commented on the pull request:

https://github.com/apache/spark/pull/987#issuecomment-48762832
  
#560 is merged. Close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1177] Allow SPARK_JAR to be set program...

2014-07-11 Thread dbtsai
Github user dbtsai closed the pull request at:

https://github.com/apache/spark/pull/987


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...

2014-07-11 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/1360#issuecomment-48762929
  
QA tests have started for PR 1360. This patch merges cleanly. brView 
progress: 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16571/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1536: multiclass classification support ...

2014-07-11 Thread etrain
Github user etrain commented on a diff in the pull request:

https://github.com/apache/spark/pull/886#discussion_r14836561
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala ---
@@ -768,104 +973,157 @@ object DecisionTree extends Serializable with 
Logging {
 /**
  * Extracts left and right split aggregates.
  * @param binData Array[Double] of size 2*numFeatures*numSplits
- * @return (leftNodeAgg, rightNodeAgg) tuple of type (Array[Double],
- * Array[Double]) where each array is of 
size(numFeature,2*(numSplits-1))
+ * @return (leftNodeAgg, rightNodeAgg) tuple of type 
(Array[Array[Array[Double\]\]\],
+ * Array[Array[Array[Double\]\]\]) where each array is of 
size(numFeature,
+ * (numBins - 1), numClasses)
  */
 def extractLeftRightNodeAggregates(
-binData: Array[Double]): (Array[Array[Double]], 
Array[Array[Double]]) = {
+binData: Array[Double]): (Array[Array[Array[Double]]], 
Array[Array[Array[Double]]]) = {
+
+
+  def findAggForOrderedFeatureClassification(
+  leftNodeAgg: Array[Array[Array[Double]]],
+  rightNodeAgg: Array[Array[Array[Double]]],
+  featureIndex: Int) {
+
+// shift for this featureIndex
+val shift = numClasses * featureIndex * numBins
+
+var classIndex = 0
+while (classIndex  numClasses) {
+  // left node aggregate for the lowest split
+  leftNodeAgg(featureIndex)(0)(classIndex) = binData(shift + 
classIndex)
+  // right node aggregate for the highest split
+  rightNodeAgg(featureIndex)(numBins - 2)(classIndex)
+= binData(shift + (numClasses * (numBins - 1)) + classIndex)
+  classIndex += 1
+}
+
+// Iterate over all splits.
+var splitIndex = 1
+while (splitIndex  numBins - 1) {
+  // calculating left node aggregate for a split as a sum of left 
node aggregate of a
+  // lower split and the left bin aggregate of a bin where the 
split is a high split
+  var innerClassIndex = 0
+  while (innerClassIndex  numClasses) {
+leftNodeAgg(featureIndex)(splitIndex)(innerClassIndex)
+  = binData(shift + numClasses * splitIndex + innerClassIndex) 
+
+leftNodeAgg(featureIndex)(splitIndex - 1)(innerClassIndex)
+rightNodeAgg(featureIndex)(numBins - 2 - 
splitIndex)(innerClassIndex) =
+  binData(shift + (numClasses * (numBins - 1 - splitIndex) + 
innerClassIndex)) +
+rightNodeAgg(featureIndex)(numBins - 1 - 
splitIndex)(innerClassIndex)
+innerClassIndex += 1
+  }
+  splitIndex += 1
+}
+  }
+
+  def findAggForUnorderedFeatureClassification(
+  leftNodeAgg: Array[Array[Array[Double]]],
+  rightNodeAgg: Array[Array[Array[Double]]],
+  featureIndex: Int) {
+
+val rightChildShift = numClasses * numBins * numFeatures
+var splitIndex = 0
+while (splitIndex  numBins - 1) {
+  var classIndex = 0
+  while (classIndex  numClasses) {
+// shift for this featureIndex
+val shift = numClasses * featureIndex * numBins + splitIndex * 
numClasses
+val leftBinValue = binData(shift + classIndex)
+val rightBinValue = binData(rightChildShift + shift + 
classIndex)
+leftNodeAgg(featureIndex)(splitIndex)(classIndex) = 
leftBinValue
+rightNodeAgg(featureIndex)(splitIndex)(classIndex) = 
rightBinValue
+classIndex += 1
+  }
+  splitIndex += 1
+}
+  }
+
+  def findAggForRegression(
+  leftNodeAgg: Array[Array[Array[Double]]],
+  rightNodeAgg: Array[Array[Array[Double]]],
+  featureIndex: Int) {
+
+// shift for this featureIndex
+val shift = 3 * featureIndex * numBins
+// left node aggregate for the lowest split
+leftNodeAgg(featureIndex)(0)(0) = binData(shift + 0)
+leftNodeAgg(featureIndex)(0)(1) = binData(shift + 1)
+leftNodeAgg(featureIndex)(0)(2) = binData(shift + 2)
+
+// right node aggregate for the highest split
+rightNodeAgg(featureIndex)(numBins - 2)(0) =
+  binData(shift + (3 * (numBins - 1)))
+rightNodeAgg(featureIndex)(numBins - 2)(1) =
+  binData(shift + (3 * (numBins - 1)) + 1)
+rightNodeAgg(featureIndex)(numBins - 2)(2) =
+  binData(shift + (3 * (numBins - 1)) + 2)
+
+// Iterate over all splits.
+var splitIndex 

[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14836617
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =
+  iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. P for pearson, as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
--- End diff --

So the fact R has been supporting this feature for many years  (and still 
haven't deprecated it) is good precedence that among the well known 
correlations, there aren't really any collisions. Since we require that only 
well known and widely adopted algorithms be added to mllib, the likelihood of 
collision is even smaller. (If someone's adding something in their own version 
of spark, presumably they have already looked closely at the docs and been 
warned).
Because we decided to go with strings for method specification instead of 
enums for cross language uniformity, the method is only checked (and possibly 
invalidated) at run time. Therefore, we do want to provide mechanisms for fault 
tolerance and minimization in method specification. To address a related 
comment, spearman and spearmans are both really common in referring to 
Spearman's correlation (imagine the heartbreak of having your program fail 
because of an extra s after hours of data manipulation computation!)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14836715
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
+val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter =
+  iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))}
+}, preservesPartitioning = true)
+computeCorrelationMatrix(mat)(0, 1)
+  }
+
+}
+
+/**
+ * Delegates computation to the specific correlation object based on the 
input method name
+ *
+ * Currently supported correlations: pearson, spearman.
+ * After new correlation algorithms are added, please update the 
documentation here and in
+ * Statistics.scala for the correlation APIs.
+ *
+ * Cases are ignored when doing method matching. We also allow initials, 
e.g. P for pearson, as
+ * long as initials are unique in the supported set of correlation 
algorithms. In addition, a
+ * supported method name has to be a substring of the input method name 
for it to be matched (e.g.
+ * spearmansrho will be matched to spearman)
+ *
+ * Maintains the default correlation type, pearson
+ */
+object Correlations {
--- End diff --

The reason I named it Correlations isn't to copy Vectors but to signify the 
fact that this is more of a delegator object that maintains all supported 
correlations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1536: multiclass classification support ...

2014-07-11 Thread etrain
Github user etrain commented on the pull request:

https://github.com/apache/spark/pull/886#issuecomment-48767401
  
I've gone through this in some depth, and aside from a couple of minor 
style nits - the logic looks good to me. Manish - have you compared output vs. 
scikit-learn for multiclass datasets and verified that things look at least 
reasonably similar?

Really awesome work!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations

2014-07-11 Thread dorx
Github user dorx commented on a diff in the pull request:

https://github.com/apache/spark/pull/1367#discussion_r14836922
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala 
---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.correlation
+
+import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
+import org.apache.spark.rdd.RDD
+
+/**
+ * New correlation algorithms should implement this trait
+ */
+trait Correlation {
+
+  /**
+   * Compute correlation for two datasets.
+   */
+  def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double
+
+  /**
+   * Compute the correlation matrix S, for the input matrix, where S(i, j) 
is the correlation
+   * between column i and j.
+   */
+  def computeCorrelationMatrix(X: RDD[Vector]): Matrix
+
+  /**
+   * Combine the two input RDD[Double]s into an RDD[Vector] and compute 
the correlation using the
+   * correlation implementation for RDD[Vector]
+   */
+  def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): 
Double = {
--- End diff --

I made it its own method on purpose to force future developers to think 
about whether this a good option or if custom code for vectors can be more 
performant. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP] SPARK-2450: Add YARN executor log links ...

2014-07-11 Thread kbzod
GitHub user kbzod opened a pull request:

https://github.com/apache/spark/pull/1375

[WIP] SPARK-2450: Add YARN executor log links to UI executors page

This adds a new column in the Executors page of the Spark UI called Logs, 
visible only when running under YARN. After getting a container for an 
executor, the YarnAllocationHandler informs the UI of where its logs reside, so 
that a link can be presented in the Logs column for it.

Current issues / limitations:
* This only works running with yarn-cluster, not yarn-client.
* The mechanism for getting the log URL through to the UI is different than 
the usual way information gets there (via SparkListeners). There may be a 
better, more decoupled way to do it.

![screen shot 2014-07-10 at 4 36 28 
pm](https://cloud.githubusercontent.com/assets/1541206/3557128/006f6ea2-092c-11e4-8125-5c2c3a158068.png)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kbzod/spark SPARK-2450

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/1375.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1375


commit f3d42ecf3309e6c3ddd7ffc2fce990776059b414
Author: Bill Havanki bhava...@cloudera.com
Date:   2014-07-10T18:28:46Z

SPARK-2450 Initial prototype.

commit 60cc1121d27f94bbf695763d585b29a8be4023aa
Author: Bill Havanki bhava...@cloudera.com
Date:   2014-07-10T19:32:08Z

SPARK-2450 Better abstraction

commit 57dca648aa30a15525d03d3bbecb85f4340829eb
Author: Bill Havanki bhava...@cloudera.com
Date:   2014-07-11T17:59:26Z

SPARK-2450 Add Javadoc.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [WIP] SPARK-2450: Add YARN executor log links ...

2014-07-11 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/1375#issuecomment-48768609
  
Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...

2014-07-11 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1374#issuecomment-48768736
  
Thanks, looks good. I tested this with:

```
SBT_MAVEN_PROFILES=yarn SBT_MAVEN_PROPERTIES=hadoop.versn=2.2.0 sbt/sbt 
package
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2411] Add a history-not-found page to s...

2014-07-11 Thread pwendell
Github user pwendell commented on the pull request:

https://github.com/apache/spark/pull/1336#issuecomment-48769071
  
LGTM with one small comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...

2014-07-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/1374


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-2411] Add a history-not-found page to s...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1336#discussion_r14837627
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala 
---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master.ui
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.Node
+
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+private[spark] class HistoryNotFoundPage(parent: MasterWebUI)
+  extends WebUIPage(history/not-found) {
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+val content =
+  div class=row-fluid
+div class=span12 style=font-size:14px;font-weight:bold
+  No event logs were found for this application. To enable event 
logging, please set
--- End diff --

Should you also mention the setting for the event log directory? Otherwise 
they might think they can just set `enabled` and it will auto-magically work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14837730
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
--- End diff --

okay then - `atMemoryLimit` or something


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14837775
  
--- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---
@@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, 
maxMemory: Long)
   }
 
   /**
+   * Unfold the given block in memory safely.
+   *
+   * The safety of this operation refers to avoiding potential OOM 
exceptions caused by
+   * unfolding the entirety of the block in memory at once. This is 
achieved by periodically
+   * checking whether the memory restrictions for unfolding blocks are 
still satisfied,
+   * stopping immediately if not. This check is a safeguard against the 
scenario in which
+   * there is not enough free memory to accommodate the entirety of a 
single block.
+   *
+   * This method returns either a fully unfolded array or a partially 
unfolded iterator.
+   */
+  def unfoldSafely(
+  blockId: BlockId,
+  values: Iterator[Any],
+  droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)])
+: Either[Array[Any], Iterator[Any]] = {
+
+var count = 0   // The number of elements unfolded so 
far
+var enoughMemory = true // Whether there is enough memory to 
unfold this block
+var previousSize = 0L   // Previous estimate of the size of 
our buffer
+val memoryRequestPeriod = 1000  // How frequently we request for more 
memory for our buffer
--- End diff --

What about something simple, basically do the check if `count  100` or if 
you are at the interval.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...

2014-07-11 Thread pwendell
Github user pwendell commented on a diff in the pull request:

https://github.com/apache/spark/pull/1165#discussion_r14837855
  
--- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala ---
@@ -67,10 +67,14 @@ class SparkEnv (
 val metricsSystem: MetricsSystem,
 val conf: SparkConf) extends Logging {
 
-  // A mapping of thread ID to amount of memory used for shuffle in bytes
+  // A mapping of thread ID to amount of memory, in bytes, used for shuffle
   // All accesses should be manually synchronized
   val shuffleMemoryMap = mutable.HashMap[Long, Long]()
 
+  // A mapping of thread ID to amount of memory, in bytes, used for 
unrolling an RDD partition
+  // All accesses should be manually synchronized
+  val cacheMemoryMap = mutable.HashMap[Long, Long]()
--- End diff --

Wait, did this map exist before your patch at all?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] spark pull request: SPARK-1576 (Allow JAVA_OPTS to be passed as a ...

2014-07-11 Thread nishkamravi2
Github user nishkamravi2 commented on the pull request:

https://github.com/apache/spark/pull/492#issuecomment-48769703
  
@tgravescs  Sorry, missed this one somehow. Yes, spark-submit is now fully 
tested. We could keep this one open for 0.92 potentially.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >