[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808169 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { throw new BlockException(key, sBlock manager failed to return cached value for $key!) } } else { - /* This RDD is to be cached in memory. In this case we cannot pass the computed values + /* + * This RDD is to be cached in memory. In this case we cannot pass the computed values * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. - * when the entirety of the RDD does not fit in memory. */ - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + * when the entirety of the RDD does not fit in memory. + * + * In addition, we must be careful to not unfold the entire partition in memory at once. + * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this + * single partition. Instead, we unfold the values cautiously, potentially aborting and + * dropping the partition to disk if applicable. + */ + blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) match { +case Left(arrayValues) = + // We have successfully unfolded the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, arrayValues, storageLevel, tellMaster = true) + arrayValues.iterator.asInstanceOf[Iterator[T]] --- End diff -- This cast is the one that is scary to me, we should try to restructure it so we are casting to a more general type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP][SPARK-2054][SQL] Code Generation for Exp...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/993#issuecomment-48697796 QA tests have started for PR 993. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16557/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808250 --- Diff: python/pyspark/join.py --- @@ -1,35 +1,19 @@ - -Copyright (c) 2011, Douban Inc. http://www.douban.com/ -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - -* Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - -* Neither the name of the Douban Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -AS IS AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - --- End diff -- This was discussed in https://github.com/apache/spark/pull/592 , and the conclusion was to keep the Douban BSD license. Also, this is irrelevant to this PR. So please remove this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808269 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { throw new BlockException(key, sBlock manager failed to return cached value for $key!) } } else { - /* This RDD is to be cached in memory. In this case we cannot pass the computed values + /* + * This RDD is to be cached in memory. In this case we cannot pass the computed values * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. - * when the entirety of the RDD does not fit in memory. */ - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + * when the entirety of the RDD does not fit in memory. + * + * In addition, we must be careful to not unfold the entire partition in memory at once. + * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this + * single partition. Instead, we unfold the values cautiously, potentially aborting and + * dropping the partition to disk if applicable. + */ + blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) match { +case Left(arrayValues) = + // We have successfully unfolded the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, arrayValues, storageLevel, tellMaster = true) + arrayValues.iterator.asInstanceOf[Iterator[T]] +case Right(iteratorValues) = + // There is not enough space to cache this partition in memory + logWarning(sNot enough space to cache $key in memory! + +sFree memory is ${blockManager.memoryStore.freeMemory}B.) + var returnValues = iteratorValues.asInstanceOf[Iterator[T]] + if (storageLevel.useDisk) { +logWarning(sPersisting $key to disk instead.) +val newLevel = StorageLevel( --- End diff -- Could this create a sort of strange situation where I persist an RDD with storage level MEMORY_AND_DISK but then I get back listener updates where individual blocks come back with StorageLevel MEMORY_ONLY? This seems like it could be really confusing for downstream consumers of the StorageLevel of a particular RDD... do we do this anywhere else? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808344 --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala --- @@ -67,10 +67,14 @@ class SparkEnv ( val metricsSystem: MetricsSystem, val conf: SparkConf) extends Logging { - // A mapping of thread ID to amount of memory used for shuffle in bytes + // A mapping of thread ID to amount of memory, in bytes, used for shuffle // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() + // A mapping of thread ID to amount of memory, in bytes, used for unrolling an RDD partition + // All accesses should be manually synchronized + val cacheMemoryMap = mutable.HashMap[Long, Long]() --- End diff -- Could we call this `partitionMemoryMap`? Otherwise it could seem like this actually represents home of the cached data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808675 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.apache.spark.mllib.linalg.{Matrix, Vector} +import org.apache.spark.mllib.stat.correlation.Correlations +import org.apache.spark.rdd.RDD + +object Statistics { + + /** + * Compute the Pearson correlation matrix for the input RDD of Vectors. + */ + def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X) + + /** + * Compute the correlation matrix for the input RDD of Vectors using the specified method. + * + * Methods currently supported: pearson (default), spearman --- End diff -- Follow ScalaDoc for methods. It is worth noting the cost for spearman. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808684 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait --- End diff -- Trait for correlation algorithms. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808671 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.apache.spark.mllib.linalg.{Matrix, Vector} +import org.apache.spark.mllib.stat.correlation.Correlations +import org.apache.spark.rdd.RDD + +object Statistics { --- End diff -- Need doc for public classes. Please mark it as @Experimental for the initial version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808679 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat + +import org.apache.spark.mllib.linalg.{Matrix, Vector} +import org.apache.spark.mllib.stat.correlation.Correlations +import org.apache.spark.rdd.RDD + +object Statistics { + + /** + * Compute the Pearson correlation matrix for the input RDD of Vectors. + */ + def corr(X: RDD[Vector]): Matrix = Correlations.corrMatrix(X) + + /** + * Compute the correlation matrix for the input RDD of Vectors using the specified method. + * + * Methods currently supported: pearson (default), spearman + */ + def corr(X: RDD[Vector], method: String): Matrix = Correlations.corrMatrix(X, method) + + /** + * Compute the Pearson correlation for the input RDDs. + */ + def corr(x: RDD[Double], y: RDD[Double]): Double = Correlations.corr(x, y) + + /** + * Compute the correlation for the input RDDs using the specified method. + * + * Methods currently supported: pearson (default), spearman + */ + def corr(x: RDD[Double], y: RDD[Double], method: String): Double = Correlations.corr(x, y, method) + --- End diff -- remove empty lines --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808688 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { --- End diff -- `private[stat]`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808707 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { +val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter = + iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))} --- End diff -- spaces around `case` and add one space before `}` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808705 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { +val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter = --- End diff -- `({` - ` {` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808714 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { +val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter = + iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))} +}, preservesPartitioning = true) +computeCorrelationMatrix(mat)(0, 1) + } + +} + +/** + * Delegates computation to the specific correlation object based on the input method name + * + * Currently supported correlations: pearson, spearman. + * After new correlation algorithms are added, please update the documentation here and in + * Statistics.scala for the correlation APIs. + * + * Cases are ignored when doing method matching. We also allow initials, e.g. P for pearson, as + * long as initials are unique in the supported set of correlation algorithms. In addition, a + * supported method name has to be a substring of the input method name for it to be matched (e.g. + * spearmansrho will be matched to spearman) + * + * Maintains the default correlation type, pearson + */ +object Correlations { --- End diff -- Change to `Correlation` and mark it `private[stat]`. We used `Vectors` because `scala.Vector` is imported by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: hijack hash to make hash of None consistant cr...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1371#issuecomment-48699285 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808775 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -463,16 +463,16 @@ private[spark] class BlockManager( val values = dataDeserialize(blockId, bytes) if (level.deserialized) { // Cache the values before returning them -// TODO: Consider creating a putValues that also takes in a iterator? -val valuesBuffer = new ArrayBuffer[Any] -valuesBuffer ++= values -memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data - match { -case Left(values2) = - return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) -case _ = - throw new SparkException(Memory store did not return back an iterator) - } +val putResult = memoryStore.putValues(blockId, values, level, returnValues = true) +putResult.data match { + case Left(it) = +return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) + case Right(b) = +return Some(new BlockResult( + dataDeserialize(blockId, b), --- End diff -- We should talk more about what's going on here tomorrow. I read this function for 15 minutes and couldn't figure out what was going on. For one thing though, AFAIK we are only at this branch if we've read a block from disk. Will we just over-write the existing on-disk block again in this scenario. Also, is there any reason to return `dataDeserailize(blocId, b)` here instead of just returning `values`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808809 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -87,9 +97,32 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { -val valueEntries = new ArrayBuffer[Any]() -valueEntries ++= values -putValues(blockId, valueEntries, level, returnValues) +val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] +val unfoldedValues = unfoldSafely(blockId, values, droppedBlocks) +unfoldedValues match { + case Left(arrayValues) = +// Values are fully unfolded in memory, so store them as an array +val result = putValues(blockId, arrayValues, level, returnValues) +droppedBlocks ++= result.droppedBlocks +PutResult(result.size, result.data, droppedBlocks) + case Right(iteratorValues) = +// Not enough space to unfold this block; drop to disk if applicable +logWarning(sNot enough space to store $blockId in memory! Free memory is ${freeMemory}B.) +if (level.useDisk) { + logWarning(sPersisting $blockId to disk instead.) + val newLevel = StorageLevel( +useDisk = true, +useMemory = false, +useOffHeap = false, +deserialized = false, +level.replication) + val result = blockManager.diskStore.putValues( --- End diff -- Should we check if the diskStore already contains the value here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808804 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { +val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter = + iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))} +}, preservesPartitioning = true) +computeCorrelationMatrix(mat)(0, 1) + } + +} + +/** + * Delegates computation to the specific correlation object based on the input method name + * + * Currently supported correlations: pearson, spearman. + * After new correlation algorithms are added, please update the documentation here and in + * Statistics.scala for the correlation APIs. + * + * Cases are ignored when doing method matching. We also allow initials, e.g. P for pearson, as + * long as initials are unique in the supported set of correlation algorithms. In addition, a --- End diff -- This feature actually makes it error-prone to add new correlations. For example, I use `corr(x, y, p)` in my code and in v1.2 someone adds another correlation starting with p. Then it breaks existing code. I saw R implements this feature but it simply limits the methods to pearson, spearman, and kendall. If they plan to add new correlation with prefix collision, they will face the same problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808829 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block +var previousSize = 0L // Previous estimate of the size of our buffer +val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer --- End diff -- Maybe this should be `memoryRequestInterval`? To me period implies time and frequency implies 1/time, but this is neither. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808850 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block +var previousSize = 0L // Previous estimate of the size of our buffer +val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer + +val threadId = Thread.currentThread().getId +val cacheMemoryMap = SparkEnv.get.cacheMemoryMap +var buffer = new SizeTrackingAppendOnlyBuffer[Any] + +try { + while (values.hasNext enoughMemory) { +buffer += values.next() +count += 1 +if (count % memoryRequestPeriod == 1) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = if (previousSize 0) math.max(currentSize - previousSize, 0) else 0 + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + cacheMemoryMap.synchronized { +val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) +val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + +// Request for memory for the local buffer, and return whether request is granted --- End diff -- Thing it's proper not to use a comma here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14808860 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { +val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter = + iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))} +}, preservesPartitioning = true) +computeCorrelationMatrix(mat)(0, 1) + } + +} + +/** + * Delegates computation to the specific correlation object based on the input method name + * + * Currently supported correlations: pearson, spearman. + * After new correlation algorithms are added, please update the documentation here and in + * Statistics.scala for the correlation APIs. + * + * Cases are ignored when doing method matching. We also allow initials, e.g. P for pearson, as + * long as initials are unique in the supported set of correlation algorithms. In addition, a + * supported method name has to be a substring of the input method name for it to be matched (e.g. + * spearmansrho will be matched to spearman) --- End diff -- This feature is not necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808921 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block +var previousSize = 0L // Previous estimate of the size of our buffer +val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer + +val threadId = Thread.currentThread().getId +val cacheMemoryMap = SparkEnv.get.cacheMemoryMap +var buffer = new SizeTrackingAppendOnlyBuffer[Any] + +try { + while (values.hasNext enoughMemory) { +buffer += values.next() +count += 1 +if (count % memoryRequestPeriod == 1) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = if (previousSize 0) math.max(currentSize - previousSize, 0) else 0 + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + cacheMemoryMap.synchronized { +val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) +val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + +// Request for memory for the local buffer, and return whether request is granted +def requestForMemory(): Boolean = { --- End diff -- Why is this it's own function? Why not just do this: ``` val availableMemory = freeMemory - otherThreadsMemory val granted = availableMemory memoryToRequest if (granted) { cacheMemoryMap(threadId) = memoryToRequest } else { val result = ensureFreeSpace(blockId, globalBufferMemory) droppedBlocks ++= result.droppedBlocks enoughMemory = requestForMemory() } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14808944 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block +var previousSize = 0L // Previous estimate of the size of our buffer +val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer + +val threadId = Thread.currentThread().getId +val cacheMemoryMap = SparkEnv.get.cacheMemoryMap +var buffer = new SizeTrackingAppendOnlyBuffer[Any] + +try { + while (values.hasNext enoughMemory) { +buffer += values.next() +count += 1 +if (count % memoryRequestPeriod == 1) { + // Calculate the amount of memory to request from the global memory pool + val currentSize = buffer.estimateSize() + val delta = if (previousSize 0) math.max(currentSize - previousSize, 0) else 0 + val memoryToRequest = currentSize + delta + previousSize = currentSize + + // Atomically check whether there is sufficient memory in the global pool to continue + cacheMemoryMap.synchronized { +val previouslyOccupiedMemory = cacheMemoryMap.get(threadId).getOrElse(0L) +val otherThreadsMemory = cacheMemoryMap.values.sum - previouslyOccupiedMemory + +// Request for memory for the local buffer, and return whether request is granted +def requestForMemory(): Boolean = { --- End diff -- Might be sufficient to just name this `requestMemory` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/1354#discussion_r14809046 --- Diff: python/pyspark/rdd.py --- @@ -509,7 +522,8 @@ def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() -[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] +[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), \ +('was', 8), ('white', 9), ('whose', 6)] --- End diff -- is this supposed to be like this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2119][SQL] Improved Parquet performance...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1370#issuecomment-48700541 QA results for PR 1370:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16555/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14809203 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block +var previousSize = 0L // Previous estimate of the size of our buffer +val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer --- End diff -- One case I'm concerned about is need to spill is if someone has very large individual objects (e.g. they do a groupByKey(X).cache() with a big key). It might make sense to have some bi-modal thing where we sample every element for the first 100 elements and then we smooth it out later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1969][MLlib] Public available online su...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/955#issuecomment-48701090 QA tests have started for PR 955. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16558/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1165#issuecomment-48701135 Did an initial pass with some feedback - not totally done yet but it should be enough to get some work done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1369#issuecomment-48701944 QA results for PR 1369:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16556/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1969][MLlib] Public available online su...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/955#issuecomment-48702465 QA tests have started for PR 955. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16561/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14809771 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -463,16 +463,16 @@ private[spark] class BlockManager( val values = dataDeserialize(blockId, bytes) if (level.deserialized) { // Cache the values before returning them -// TODO: Consider creating a putValues that also takes in a iterator? -val valuesBuffer = new ArrayBuffer[Any] -valuesBuffer ++= values -memoryStore.putValues(blockId, valuesBuffer, level, returnValues = true).data - match { -case Left(values2) = - return Some(new BlockResult(values2, DataReadMethod.Disk, info.size)) -case _ = - throw new SparkException(Memory store did not return back an iterator) - } +val putResult = memoryStore.putValues(blockId, values, level, returnValues = true) +putResult.data match { + case Left(it) = +return Some(new BlockResult(it, DataReadMethod.Disk, info.size)) + case Right(b) = +return Some(new BlockResult( + dataDeserialize(blockId, b), --- End diff -- You can't just return `values` because `memoryStore.putValues` already exhausted the iterator. This is one of those places where we `put` the values as an iterator somewhere and read them back in order to return them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14809809 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block --- End diff -- Ok. I had it named exactly that earlier, but it feels like if `outOfMemory` is true then it is already too late... I can change it back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14809836 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -87,9 +97,32 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) values: Iterator[Any], level: StorageLevel, returnValues: Boolean): PutResult = { -val valueEntries = new ArrayBuffer[Any]() -valueEntries ++= values -putValues(blockId, valueEntries, level, returnValues) +val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] +val unfoldedValues = unfoldSafely(blockId, values, droppedBlocks) +unfoldedValues match { + case Left(arrayValues) = +// Values are fully unfolded in memory, so store them as an array +val result = putValues(blockId, arrayValues, level, returnValues) +droppedBlocks ++= result.droppedBlocks +PutResult(result.size, result.data, droppedBlocks) + case Right(iteratorValues) = +// Not enough space to unfold this block; drop to disk if applicable +logWarning(sNot enough space to store $blockId in memory! Free memory is ${freeMemory}B.) +if (level.useDisk) { + logWarning(sPersisting $blockId to disk instead.) + val newLevel = StorageLevel( +useDisk = true, +useMemory = false, +useOffHeap = false, +deserialized = false, +level.replication) + val result = blockManager.diskStore.putValues( --- End diff -- If there is, do we kick out the old one? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14809878 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { throw new BlockException(key, sBlock manager failed to return cached value for $key!) } } else { - /* This RDD is to be cached in memory. In this case we cannot pass the computed values + /* + * This RDD is to be cached in memory. In this case we cannot pass the computed values * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. - * when the entirety of the RDD does not fit in memory. */ - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + * when the entirety of the RDD does not fit in memory. + * + * In addition, we must be careful to not unfold the entire partition in memory at once. + * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this + * single partition. Instead, we unfold the values cautiously, potentially aborting and + * dropping the partition to disk if applicable. + */ + blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) match { +case Left(arrayValues) = + // We have successfully unfolded the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, arrayValues, storageLevel, tellMaster = true) + arrayValues.iterator.asInstanceOf[Iterator[T]] +case Right(iteratorValues) = + // There is not enough space to cache this partition in memory + logWarning(sNot enough space to cache $key in memory! + +sFree memory is ${blockManager.memoryStore.freeMemory}B.) + var returnValues = iteratorValues.asInstanceOf[Iterator[T]] + if (storageLevel.useDisk) { +logWarning(sPersisting $key to disk instead.) +val newLevel = StorageLevel( --- End diff -- I see. Looks like we do this all over the place in the storage page of the UI, but not in BlockManager itself. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14809944 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( --- End diff -- Ok. I find it scary too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14809936 --- Diff: core/src/main/scala/org/apache/spark/CacheManager.scala --- @@ -140,14 +144,39 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { throw new BlockException(key, sBlock manager failed to return cached value for $key!) } } else { - /* This RDD is to be cached in memory. In this case we cannot pass the computed values + /* + * This RDD is to be cached in memory. In this case we cannot pass the computed values * to the BlockManager as an iterator and expect to read it back later. This is because * we may end up dropping a partition from memory store before getting it back, e.g. - * when the entirety of the RDD does not fit in memory. */ - val elements = new ArrayBuffer[Any] - elements ++= values - updatedBlocks ++= blockManager.put(key, elements, storageLevel, tellMaster = true) - elements.iterator.asInstanceOf[Iterator[T]] + * when the entirety of the RDD does not fit in memory. + * + * In addition, we must be careful to not unfold the entire partition in memory at once. + * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this + * single partition. Instead, we unfold the values cautiously, potentially aborting and + * dropping the partition to disk if applicable. + */ + blockManager.memoryStore.unfoldSafely(key, values, updatedBlocks) match { +case Left(arrayValues) = + // We have successfully unfolded the entire partition, so cache it in memory + updatedBlocks ++= blockManager.put(key, arrayValues, storageLevel, tellMaster = true) + arrayValues.iterator.asInstanceOf[Iterator[T]] --- End diff -- I agree. We could add a type parameter to `unfoldSafely`, since any one block must have values of the same type. Note that even in the existing code we already cast it to `Iterator[T]` all over the place. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14810031 --- Diff: core/src/main/scala/org/apache/spark/util/collection/SizeTracker.scala --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.util.SizeEstimator + +/** + * A general interface for collections to keep track of their estimated sizes in bytes. + * We sample with a slow exponential back-off using the SizeEstimator to amortize the time, + * as each call to SizeEstimator is somewhat expensive (order of a few milliseconds). + */ +private[spark] trait SizeTracker { + + import SizeTracker._ + + /** + * Controls the base of the exponential which governs the rate of sampling. + * E.g., a value of 2 would mean we sample at 1, 2, 4, 8, ... elements. + */ + private val SAMPLE_GROWTH_RATE = 1.1 + + /** Samples taken since last resetSamples(). Only the last two are kept for extrapolation. */ + private val samples = new ArrayBuffer[Sample] + + /** The average number of bytes per update between our last two samples. */ + private var bytesPerUpdate: Double = _ + + /** Total number of insertions and updates into the map since the last resetSamples(). */ + private var numUpdates: Long = _ + + /** The value of 'numUpdates' at which we will take our next sample. */ + private var nextSampleNum: Long = _ + + resetSamples() + + /** + * Reset samples collected so far. + * This should be called after the collection undergoes a dramatic change in size. + */ + protected def resetSamples(): Unit = { +numUpdates = 1 +nextSampleNum = 1 +samples.clear() +takeSample() + } + + /** + * Callback to be invoked after every update. + */ + protected def afterUpdate(): Unit = { +numUpdates += 1 +if (nextSampleNum == numUpdates) { + takeSample() +} + } + + /** + * Take a new sample of the current collection's size. + */ + private def takeSample(): Unit = { +samples += Sample(SizeEstimator.estimate(this), numUpdates) +// Only use the last two samples to extrapolate +if (samples.size 2) { + samples.remove(0) +} +val bytesDelta = samples.toSeq.reverse match { + case latest :: previous :: tail = +(latest.size - previous.size).toDouble / (latest.numUpdates - previous.numUpdates) + // If fewer than 2 samples, assume no change + case _ = 0 +} +bytesPerUpdate = math.max(0, bytesDelta) +nextSampleNum = math.ceil(numUpdates * SAMPLE_GROWTH_RATE).toLong + } + + /** + * Estimate the current size of the collection in bytes. O(1) time. + */ + def estimateSize(): Long = { +assert(samples.nonEmpty) --- End diff -- First, because `samples` can never be empty (`resetSamples` always takes at least one sample). Second, having 0 samples does not mean we have no data in the underlying collection; it just means we don't have any information about the current size. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Fix JIRA-983 and support exteranl sort for sor...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/931#issuecomment-48703798 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Use the Executor's ClassLoader in sc.objectFil...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/181#issuecomment-48704142 Cool - thanks for contributing the test! Jenkins, test this please. LGTM pending tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Use the Executor's ClassLoader in sc.objectFil...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/181#issuecomment-48704291 QA tests have started for PR 181. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16563/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/1354#discussion_r14810364 --- Diff: python/pyspark/rdd.py --- @@ -509,7 +522,8 @@ def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() -[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] +[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), \ +('was', 8), ('white', 9), ('whose', 6)] --- End diff -- Pep8 does not like very long lines. Other option was to change the test case. I am not sure if we can ask it to ignore one line in comments. On Jul 11, 2014 12:35 PM, Reynold Xin notificati...@github.com wrote: In python/pyspark/rdd.py: @@ -509,7 +522,8 @@ def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x): tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)] tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)]) sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect() -[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)] +[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), \ +('was', 8), ('white', 9), ('whose', 6)] is this supposed to be like this? â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1354/files#r14809046. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: mesos executor ids now consist of the slave id...
Github user drexin commented on the pull request: https://github.com/apache/spark/pull/1358#issuecomment-48706534 Hi Patrick, the problem is described in [this mailing list entry](http://mail-archives.apache.org/mod_mbox/mesos-user/201407.mbox/%3c53b66e6d.7090...@uninett.no%3e) If I understand the [documentation on run modes](http://spark.apache.org/docs/latest/running-on-mesos.html) and the code correctly, in fine grained mode it starts a separate instance of `MesosExecutorBackend` for each spark task. If this is correct, then as soon as 2 tasks run concurrently on the same machine we should run into this problem. On [this line](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala#L329) in the `BlockManagerMasterActor`, there is a check on the `BlockManagerId`, which will always be different per `Executor` instance, because the port in there is randomly assigned. The `executorId` however is always set to the mesos `slaveId`. This means that we are running into [this case](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala#L331-L335) as soon as we start two `Executor` instances on the same slave. This PR fixes this by adding the counter to the `executorId`. Please tell me if I overlooked something. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1369#issuecomment-48706567 QA tests have started for PR 1369. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16564/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin
Github user YanjieGao commented on the pull request: https://github.com/apache/spark/pull/1134#issuecomment-48707013 Thanks Michael , (1) We could make it as a user hint ,like hive does . set hive.optimize.skewjoin = true; set hive.skewjoin.key = skew_key_threshold ï¼default = 10ï¼ We could use set sparksql.optimize.skewjoin=true set sparksql.skewjoin.key=skew_key_threshold (2)We could use sample to found the relative num of the key and though skew_key_threshold which is user set can judge which key is over the threshold (3) toString will generate many singleton object . ,I will optimize the code in next step. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/1369#issuecomment-48707227 Wasn't this already decided against in https://github.com/apache/spark/pull/332 and again https://github.com/apache/spark/pull/1208 ? or is this not another PR for https://issues.apache.org/jira/browse/SPARK-1470 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: mesos executor ids now consist of the slave id...
Github user drexin commented on the pull request: https://github.com/apache/spark/pull/1358#issuecomment-48707209 Created a JIRA issue here: https://issues.apache.org/jira/browse/SPARK-2445 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin
Github user YanjieGao commented on the pull request: https://github.com/apache/spark/pull/1134#issuecomment-48707289 Hi , I also make a left semi join .I don't know is this join as a optimization as the left semi join or as a single join algorithm. I think the 1127 PR also has some optimization need to do .Do you think this 1127 PR has it value to be merged ?Thanks a lot. https://github.com/apache/spark/pull/1127 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin
Github user YanjieGao commented on a diff in the pull request: https://github.com/apache/spark/pull/1134#discussion_r14811559 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala --- @@ -400,3 +401,73 @@ case class BroadcastNestedLoopJoin( streamedPlusMatches.flatMap(_._1), sqlContext.sparkContext.makeRDD(rightOuterMatches)) } } + +/** + * :: DeveloperApi :: + * In some case ,data skew happens.SkewJoin sample the table rdd to find the largest key, + * then make the largest key rows as a table rdd.The left rdd will be made leftSkewedtable + * rdd without the largest key and the maxkeyskewedtable rdd with the largest key. + * Then,join the two table with the righttable. + * Finally,union the two result rdd. + */ +@DeveloperApi +case class SkewJoinCartesianProduct( +left: SparkPlan, +right: SparkPlan, +condition: Option[Expression])(@transient sc: SparkContext) extends BinaryNode { + override def output = left.output ++ right.output + + @transient lazy val boundCondition = +InterpretedPredicate( + condition + .map(c = BindReferences.bindReference(c, left.output ++ right.output)) + .getOrElse(Literal(true))) + + def execute() = { + +val skewedTable = left.execute() +//This will later write as configuration +val sample = skewedTable.sample(false, 0.3, 9).collect() +val sortedSample = sample.sortWith((row1, row2) = row1.hashCode() row2.hashCode()) --- End diff -- i want to use key to sort the row ,I think i need some better way to obtain the key .Do you have some better way to fetch the key? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1969][MLlib] Online summarizer APIs for...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/955#issuecomment-48708106 QA results for PR 955:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16558/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Use the scala-logging wrapper instead of the d...
Github user witgo commented on the pull request: https://github.com/apache/spark/pull/1369#issuecomment-48708268 #332 can't automatic test . #1208 was messing up and I do not know how to solve . :sweat: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: replace println to log4j
GitHub user fireflyc opened a pull request: https://github.com/apache/spark/pull/1372 replace println to log4j Our program needs to receive a large amount of data and run for a long time. We set the log level to WARN but Storing iterator received single as such message written to the log file. (over yarn) You can merge this pull request into a Git repository by running: $ git pull https://github.com/fireflyc/spark fix-replace-stdout-log Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1372.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1372 commit fa22a3879a9d4887cd229966b6523dd65b2f6003 Author: fireflyc firef...@126.com Date: 2014-07-11T08:03:20Z replace println to log4j Our program needs to receive a large amount of data and run for a long time. We set the log level to WARN but Storing iterator received single as such message written to the log file. (over yarn) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1470] Use the scala-logging wrapper ins...
Github user witgo commented on the pull request: https://github.com/apache/spark/pull/332#issuecomment-48708675 It can't automatic test. I submit a new PR #1369. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: replace println to log4j
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1372#issuecomment-48708875 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1969][MLlib] Online summarizer APIs for...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/955#issuecomment-48709108 QA results for PR 955:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16560/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1969][MLlib] Online summarizer APIs for...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/955#issuecomment-48709925 QA results for PR 955:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with Serializable {brbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16561/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...
GitHub user ueshin opened a pull request: https://github.com/apache/spark/pull/1373 [SPARK-2446][SQL] Add BinaryType support to Parquet I/O. To support `BinaryType`, the following changes are needed: - Make `StringType` use `OriginalType.UTF8` - Add `BinaryType` using `PrimitiveTypeName.BINARY` without `OriginalType` You can merge this pull request into a Git repository by running: $ git pull https://github.com/ueshin/apache-spark issues/SPARK-2446 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1373.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1373 commit 616e04a989edce9f5109a3b2ad11b78e1e0f1a77 Author: Takuya UESHIN ues...@happy-camper.st Date: 2014-07-09T15:25:08Z Make StringType use OriginalType.UTF8. commit ecacb925c4e1f2dbf863b604ad2600cc8c9663d8 Author: Takuya UESHIN ues...@happy-camper.st Date: 2014-07-11T09:13:46Z Add BinaryType support to Parquet I/O. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1373#issuecomment-48710936 QA tests have started for PR 1373. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16565/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14813843 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExpectedExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value, that is double between 0 and 1. + var minRegisteredRatio = conf.getDouble(spark.scheduler.minRegisteredExecutorsRatio, 0) --- End diff -- @tgravescs Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user li-zhihui commented on the pull request: https://github.com/apache/spark/pull/900#issuecomment-48714143 Thanks @tgravescs I will file a new jira for handling mesos and follow it after the PR merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1155#issuecomment-48714186 QA tests have started for PR 1155. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16566/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1470,SPARK-1842] Use the scala-logging ...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1369#issuecomment-48714364 QA results for PR 1369:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16564/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...
Github user avulanov commented on a diff in the pull request: https://github.com/apache/spark/pull/1155#discussion_r14814439 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/MulticlassMetrics.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + +import scala.collection.Map --- End diff -- RDD functions return scala.collection.Map in various methods that I use: countByValue, collectAsMap etc. Returning scala.collection.Map is not consistent with Scala predef for Map which is collection.immutable.Map and the compiler returns type mismatch error if I remove this import. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...
Github user avulanov commented on the pull request: https://github.com/apache/spark/pull/1155#issuecomment-48715008 @mengxr I've addressed you comments, except the one with import which I commented above. I've posted a question about feature selection interface. Could you look at it ? http://apache-spark-developers-list.1001551.n3.nabble.com/Feature-selection-interface-td7258.html --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1373#issuecomment-48718560 QA results for PR 1373:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16565/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [MLLIB] [SPARK-2222] Add multiclass evaluation...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1155#issuecomment-48720849 QA results for PR 1155:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds the following public classes (experimental):brclass MulticlassMetrics(predictionAndLabels: RDD[(Double, Double)]) {br* (equals to precision for multiclass classifierbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16566/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1374#issuecomment-48722841 QA results for PR 1374:br- This patch FAILED unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16567/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1354#issuecomment-48725049 QA results for PR 1354:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16568/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/1374#issuecomment-48725337 Jenkins, retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1374#issuecomment-48725500 QA tests have started for PR 1374. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16569/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/1279#discussion_r14821748 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -124,6 +124,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set(spark.home, home) } + /** + * Set the max number of submission retries the Spark client will attempt + * before giving up + */ --- End diff -- We haven't been adding specific routines to set the configs. The user can just set it using the existing SparkConf.set routines so I think we should remove this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/1279#discussion_r14822032 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -167,6 +175,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getOption(key).map(_.toInt).getOrElse(defaultValue) } + def getIntOption(key: String): Option[Int] = getOption(key).map(_.toInt) --- End diff -- To keep things consistent (these api's are public) I don't think we should add the getIntOption without adding other routines like getLongOption, etc. For now can you just use getOption and then make it an Int. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/1279#discussion_r14822125 --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -108,6 +108,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) appContext.setApplicationId(appId) appContext.setApplicationName(args.appName) +sparkConf.getIntOption(spark.maxappattempts) match { + case Some(v) = appContext.setMaxAppAttempts(v) --- End diff -- hadoop 0.23 (yarn alpha) doesn't have a setMaxAppAttempts routine. Just remove this and only do it in the yarn stable version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2165] spark on yarn: add support for se...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/1279#discussion_r14822197 --- Diff: yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -81,6 +81,10 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType(SPARK) +sparkConf.getIntOption(spark.maxappattempts) match { + case Some(v) = appContext.setMaxAppAttempts(v) + case None = logDebug(Not setting max app attempts.) --- End diff -- Can you add something like cluster default setting will be used to the log statement? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1374#issuecomment-48736229 QA results for PR 1374:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16569/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14823260 --- Diff: docs/configuration.md --- @@ -699,6 +699,22 @@ Apart from these, the following properties are also available, and may be useful (in milliseconds) /td /tr +/tr + tdcodespark.scheduler.minRegisteredExecutorsRatio/code/td + td0/td + td +Submit tasks only after (registered executors / total expected executors) +is equal to at least this value, which is double between 0 and 1. + /td +/tr +tr + tdcodespark.scheduler.maxRegisteredExecutorsWaitingTime/code/td + td3/td + td +Whatever (registered executors / total expected executors) is reached --- End diff -- I think we should clarify both of these a bit because its really you start when either one is hit so I think adding reference to maxRegisteredExecutorsWaitingTime from the description of minRegisteredExecutorsRatio would be good. How about something like below? Note I'm not a doc writer so I'm fine with changing. for spark.scheduler.minRegisteredExecutorsRatio: The minimum ratio of registered executors (registered executors / total expected executors) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of executors has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config codespark.scheduler.maxRegisteredExecutorsWaitingTime/code . Then for spark.scheduler.maxRegisteredExecutorsWaitingTime: Maximum amount of time to wait for executors to register before scheduling begins (in milliseconds). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: replace println to log4j
Github user fireflyc commented on the pull request: https://github.com/apache/spark/pull/1372#issuecomment-48739929 I have verified, the log level is set to Info right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: Made rdd.py pep8 complaint by using Autopep8 a...
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/1354#issuecomment-48740596 Found a way actually. On Jul 11, 2014 6:07 PM, Apache Spark QA notificati...@github.com wrote: QA results for PR 1354: - This patch PASSES unit tests. - This patch merges cleanly - This patch adds no public classes For more information see test ouptut: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16568/consoleFull â Reply to this email directly or view it on GitHub https://github.com/apache/spark/pull/1354#issuecomment-48725049. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1946] Submit tasks after (configured ra...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/900#discussion_r14825405 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,6 +257,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException(Error notifying standalone scheduler's driver actor, e) } } + + override def isReady(): Boolean = { +if (ready) { + return true +} +if ((System.currentTimeMillis() - createTime) = maxRegisteredWaitingTime) { --- End diff -- it might be nice to have a log statement here saying max time hit so we know when the scheduling began if debugging a job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1719: spark.*.extraLibraryPath isn't app...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1022#issuecomment-48750411 QA tests have started for PR 1022. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16570/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2236][SQL]SparkSQL add SkewJoin
Github user YanjieGao commented on the pull request: https://github.com/apache/spark/pull/1134#issuecomment-48752406 Hi I rewrite the code ,and resolve some former problem --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2441][SQL] Add more efficient distinct ...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/1366#issuecomment-48759522 @aarondav, you are totally right. However, the `Aggregate` operator that this is replacing made the same assumption and this approach will use strictly less memory. For the 1.2 release I'd like to focus on external algorithms and hopefully start using more of sparks built in data structures for this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2446][SQL] Add BinaryType support to Pa...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/1373#issuecomment-48760249 Thanks for the patch! One quick question: will this change the behavior when loading in string data that was saved with previous versions of Spark SQL? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1719: spark.*.extraLibraryPath isn't app...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1022#issuecomment-48761837 QA results for PR 1022:br- This patch PASSES unit tests.br- This patch merges cleanlybr- This patch adds no public classesbrbrFor more information see test ouptut:brhttps://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16570/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1177] Allow SPARK_JAR to be set program...
Github user dbtsai commented on the pull request: https://github.com/apache/spark/pull/987#issuecomment-48762832 #560 is merged. Close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1177] Allow SPARK_JAR to be set program...
Github user dbtsai closed the pull request at: https://github.com/apache/spark/pull/987 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-2425 Don't kill a still-running Applicat...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/1360#issuecomment-48762929 QA tests have started for PR 1360. This patch merges cleanly. brView progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16571/consoleFull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on a diff in the pull request: https://github.com/apache/spark/pull/886#discussion_r14836561 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala --- @@ -768,104 +973,157 @@ object DecisionTree extends Serializable with Logging { /** * Extracts left and right split aggregates. * @param binData Array[Double] of size 2*numFeatures*numSplits - * @return (leftNodeAgg, rightNodeAgg) tuple of type (Array[Double], - * Array[Double]) where each array is of size(numFeature,2*(numSplits-1)) + * @return (leftNodeAgg, rightNodeAgg) tuple of type (Array[Array[Array[Double\]\]\], + * Array[Array[Array[Double\]\]\]) where each array is of size(numFeature, + * (numBins - 1), numClasses) */ def extractLeftRightNodeAggregates( -binData: Array[Double]): (Array[Array[Double]], Array[Array[Double]]) = { +binData: Array[Double]): (Array[Array[Array[Double]]], Array[Array[Array[Double]]]) = { + + + def findAggForOrderedFeatureClassification( + leftNodeAgg: Array[Array[Array[Double]]], + rightNodeAgg: Array[Array[Array[Double]]], + featureIndex: Int) { + +// shift for this featureIndex +val shift = numClasses * featureIndex * numBins + +var classIndex = 0 +while (classIndex numClasses) { + // left node aggregate for the lowest split + leftNodeAgg(featureIndex)(0)(classIndex) = binData(shift + classIndex) + // right node aggregate for the highest split + rightNodeAgg(featureIndex)(numBins - 2)(classIndex) += binData(shift + (numClasses * (numBins - 1)) + classIndex) + classIndex += 1 +} + +// Iterate over all splits. +var splitIndex = 1 +while (splitIndex numBins - 1) { + // calculating left node aggregate for a split as a sum of left node aggregate of a + // lower split and the left bin aggregate of a bin where the split is a high split + var innerClassIndex = 0 + while (innerClassIndex numClasses) { +leftNodeAgg(featureIndex)(splitIndex)(innerClassIndex) + = binData(shift + numClasses * splitIndex + innerClassIndex) + +leftNodeAgg(featureIndex)(splitIndex - 1)(innerClassIndex) +rightNodeAgg(featureIndex)(numBins - 2 - splitIndex)(innerClassIndex) = + binData(shift + (numClasses * (numBins - 1 - splitIndex) + innerClassIndex)) + +rightNodeAgg(featureIndex)(numBins - 1 - splitIndex)(innerClassIndex) +innerClassIndex += 1 + } + splitIndex += 1 +} + } + + def findAggForUnorderedFeatureClassification( + leftNodeAgg: Array[Array[Array[Double]]], + rightNodeAgg: Array[Array[Array[Double]]], + featureIndex: Int) { + +val rightChildShift = numClasses * numBins * numFeatures +var splitIndex = 0 +while (splitIndex numBins - 1) { + var classIndex = 0 + while (classIndex numClasses) { +// shift for this featureIndex +val shift = numClasses * featureIndex * numBins + splitIndex * numClasses +val leftBinValue = binData(shift + classIndex) +val rightBinValue = binData(rightChildShift + shift + classIndex) +leftNodeAgg(featureIndex)(splitIndex)(classIndex) = leftBinValue +rightNodeAgg(featureIndex)(splitIndex)(classIndex) = rightBinValue +classIndex += 1 + } + splitIndex += 1 +} + } + + def findAggForRegression( + leftNodeAgg: Array[Array[Array[Double]]], + rightNodeAgg: Array[Array[Array[Double]]], + featureIndex: Int) { + +// shift for this featureIndex +val shift = 3 * featureIndex * numBins +// left node aggregate for the lowest split +leftNodeAgg(featureIndex)(0)(0) = binData(shift + 0) +leftNodeAgg(featureIndex)(0)(1) = binData(shift + 1) +leftNodeAgg(featureIndex)(0)(2) = binData(shift + 2) + +// right node aggregate for the highest split +rightNodeAgg(featureIndex)(numBins - 2)(0) = + binData(shift + (3 * (numBins - 1))) +rightNodeAgg(featureIndex)(numBins - 2)(1) = + binData(shift + (3 * (numBins - 1)) + 1) +rightNodeAgg(featureIndex)(numBins - 2)(2) = + binData(shift + (3 * (numBins - 1)) + 2) + +// Iterate over all splits. +var splitIndex
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14836617 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { +val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter = + iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))} +}, preservesPartitioning = true) +computeCorrelationMatrix(mat)(0, 1) + } + +} + +/** + * Delegates computation to the specific correlation object based on the input method name + * + * Currently supported correlations: pearson, spearman. + * After new correlation algorithms are added, please update the documentation here and in + * Statistics.scala for the correlation APIs. + * + * Cases are ignored when doing method matching. We also allow initials, e.g. P for pearson, as + * long as initials are unique in the supported set of correlation algorithms. In addition, a --- End diff -- So the fact R has been supporting this feature for many years (and still haven't deprecated it) is good precedence that among the well known correlations, there aren't really any collisions. Since we require that only well known and widely adopted algorithms be added to mllib, the likelihood of collision is even smaller. (If someone's adding something in their own version of spark, presumably they have already looked closely at the docs and been warned). Because we decided to go with strings for method specification instead of enums for cross language uniformity, the method is only checked (and possibly invalidated) at run time. Therefore, we do want to provide mechanisms for fault tolerance and minimization in method specification. To address a related comment, spearman and spearmans are both really common in referring to Spearman's correlation (imagine the heartbreak of having your program fail because of an extra s after hours of data manipulation computation!) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14836715 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { +val mat: RDD[Vector] = x.zip(y).mapPartitions({ iter = + iter.map {case(xi, yi) = new DenseVector(Array(xi, yi))} +}, preservesPartitioning = true) +computeCorrelationMatrix(mat)(0, 1) + } + +} + +/** + * Delegates computation to the specific correlation object based on the input method name + * + * Currently supported correlations: pearson, spearman. + * After new correlation algorithms are added, please update the documentation here and in + * Statistics.scala for the correlation APIs. + * + * Cases are ignored when doing method matching. We also allow initials, e.g. P for pearson, as + * long as initials are unique in the supported set of correlation algorithms. In addition, a + * supported method name has to be a substring of the input method name for it to be matched (e.g. + * spearmansrho will be matched to spearman) + * + * Maintains the default correlation type, pearson + */ +object Correlations { --- End diff -- The reason I named it Correlations isn't to copy Vectors but to signify the fact that this is more of a delegator object that maintains all supported correlations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1536: multiclass classification support ...
Github user etrain commented on the pull request: https://github.com/apache/spark/pull/886#issuecomment-48767401 I've gone through this in some depth, and aside from a couple of minor style nits - the logic looks good to me. Manish - have you compared output vs. scikit-learn for multiclass datasets and verified that things look at least reasonably similar? Really awesome work! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2359][MLlib] Correlations
Github user dorx commented on a diff in the pull request: https://github.com/apache/spark/pull/1367#discussion_r14836922 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/Correlation.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.correlation + +import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector} +import org.apache.spark.rdd.RDD + +/** + * New correlation algorithms should implement this trait + */ +trait Correlation { + + /** + * Compute correlation for two datasets. + */ + def computeCorrelation(x: RDD[Double], y: RDD[Double]): Double + + /** + * Compute the correlation matrix S, for the input matrix, where S(i, j) is the correlation + * between column i and j. + */ + def computeCorrelationMatrix(X: RDD[Vector]): Matrix + + /** + * Combine the two input RDD[Double]s into an RDD[Vector] and compute the correlation using the + * correlation implementation for RDD[Vector] + */ + def computeCorrelationWithMatrixImpl(x: RDD[Double], y: RDD[Double]): Double = { --- End diff -- I made it its own method on purpose to force future developers to think about whether this a good option or if custom code for vectors can be more performant. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP] SPARK-2450: Add YARN executor log links ...
GitHub user kbzod opened a pull request: https://github.com/apache/spark/pull/1375 [WIP] SPARK-2450: Add YARN executor log links to UI executors page This adds a new column in the Executors page of the Spark UI called Logs, visible only when running under YARN. After getting a container for an executor, the YarnAllocationHandler informs the UI of where its logs reside, so that a link can be presented in the Logs column for it. Current issues / limitations: * This only works running with yarn-cluster, not yarn-client. * The mechanism for getting the log URL through to the UI is different than the usual way information gets there (via SparkListeners). There may be a better, more decoupled way to do it. ![screen shot 2014-07-10 at 4 36 28 pm](https://cloud.githubusercontent.com/assets/1541206/3557128/006f6ea2-092c-11e4-8125-5c2c3a158068.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/kbzod/spark SPARK-2450 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/1375.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1375 commit f3d42ecf3309e6c3ddd7ffc2fce990776059b414 Author: Bill Havanki bhava...@cloudera.com Date: 2014-07-10T18:28:46Z SPARK-2450 Initial prototype. commit 60cc1121d27f94bbf695763d585b29a8be4023aa Author: Bill Havanki bhava...@cloudera.com Date: 2014-07-10T19:32:08Z SPARK-2450 Better abstraction commit 57dca648aa30a15525d03d3bbecb85f4340829eb Author: Bill Havanki bhava...@cloudera.com Date: 2014-07-11T17:59:26Z SPARK-2450 Add Javadoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [WIP] SPARK-2450: Add YARN executor log links ...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/1375#issuecomment-48768609 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1374#issuecomment-48768736 Thanks, looks good. I tested this with: ``` SBT_MAVEN_PROFILES=yarn SBT_MAVEN_PROPERTIES=hadoop.versn=2.2.0 sbt/sbt package ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2411] Add a history-not-found page to s...
Github user pwendell commented on the pull request: https://github.com/apache/spark/pull/1336#issuecomment-48769071 LGTM with one small comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2437] Rename MAVEN_PROFILES to SBT_MAVE...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/1374 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-2411] Add a history-not-found page to s...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1336#discussion_r14837627 --- Diff: core/src/main/scala/org/apache/spark/deploy/master/ui/HistoryNotFoundPage.scala --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master.ui + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{UIUtils, WebUIPage} + +private[spark] class HistoryNotFoundPage(parent: MasterWebUI) + extends WebUIPage(history/not-found) { + + def render(request: HttpServletRequest): Seq[Node] = { +val content = + div class=row-fluid +div class=span12 style=font-size:14px;font-weight:bold + No event logs were found for this application. To enable event logging, please set --- End diff -- Should you also mention the setting for the event log directory? Otherwise they might think they can just set `enabled` and it will auto-magically work. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14837730 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block --- End diff -- okay then - `atMemoryLimit` or something --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14837775 --- Diff: core/src/main/scala/org/apache/spark/storage/MemoryStore.scala --- @@ -141,6 +174,86 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) } /** + * Unfold the given block in memory safely. + * + * The safety of this operation refers to avoiding potential OOM exceptions caused by + * unfolding the entirety of the block in memory at once. This is achieved by periodically + * checking whether the memory restrictions for unfolding blocks are still satisfied, + * stopping immediately if not. This check is a safeguard against the scenario in which + * there is not enough free memory to accommodate the entirety of a single block. + * + * This method returns either a fully unfolded array or a partially unfolded iterator. + */ + def unfoldSafely( + blockId: BlockId, + values: Iterator[Any], + droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) +: Either[Array[Any], Iterator[Any]] = { + +var count = 0 // The number of elements unfolded so far +var enoughMemory = true // Whether there is enough memory to unfold this block +var previousSize = 0L // Previous estimate of the size of our buffer +val memoryRequestPeriod = 1000 // How frequently we request for more memory for our buffer --- End diff -- What about something simple, basically do the check if `count 100` or if you are at the interval. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: [SPARK-1777] Prevent OOMs from single partitio...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/1165#discussion_r14837855 --- Diff: core/src/main/scala/org/apache/spark/SparkEnv.scala --- @@ -67,10 +67,14 @@ class SparkEnv ( val metricsSystem: MetricsSystem, val conf: SparkConf) extends Logging { - // A mapping of thread ID to amount of memory used for shuffle in bytes + // A mapping of thread ID to amount of memory, in bytes, used for shuffle // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() + // A mapping of thread ID to amount of memory, in bytes, used for unrolling an RDD partition + // All accesses should be manually synchronized + val cacheMemoryMap = mutable.HashMap[Long, Long]() --- End diff -- Wait, did this map exist before your patch at all? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] spark pull request: SPARK-1576 (Allow JAVA_OPTS to be passed as a ...
Github user nishkamravi2 commented on the pull request: https://github.com/apache/spark/pull/492#issuecomment-48769703 @tgravescs Sorry, missed this one somehow. Yes, spark-submit is now fully tested. We could keep this one open for 0.92 potentially. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---