spark git commit: [SPARK-17549][SQL] Only collect table size stat in driver for cached relation.
Repository: spark Updated Branches: refs/heads/branch-2.0 50f6be759 -> a9165bb1b [SPARK-17549][SQL] Only collect table size stat in driver for cached relation. This reverts commit 9ac68dbc5720026ea92acc61d295ca64d0d3d132. Turns out the original fix was correct. Original change description: The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. Author: Marcelo VanzinCloses #15304 from vanzin/SPARK-17549.2. (cherry picked from commit 8d969a2125d915da1506c17833aa98da614a257f) Signed-off-by: Marcelo Vanzin Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a9165bb1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a9165bb1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a9165bb1 Branch: refs/heads/branch-2.0 Commit: a9165bb1b704483ad16331945b0968cbb1a97139 Parents: 50f6be7 Author: Marcelo Vanzin Authored: Tue Oct 4 09:38:44 2016 -0700 Committer: Marcelo Vanzin Committed: Tue Oct 4 09:39:50 2016 -0700 -- .../execution/columnar/InMemoryRelation.scala | 24 +--- .../columnar/InMemoryColumnarQuerySuite.scala | 14 2 files changed, 20 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a9165bb1/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 479934a..56bd5c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.columnar -import scala.collection.JavaConverters._ - import org.apache.commons.lang3.StringUtils import org.apache.spark.network.util.JavaUtils @@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.CollectionAccumulator +import org.apache.spark.util.LongAccumulator object InMemoryRelation { @@ -63,8 +61,7 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: CollectionAccumulator[InternalRow] = - child.sqlContext.sparkContext.collectionAccumulator[InternalRow]) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) @@ -74,21 +71,12 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) override lazy val statistics: Statistics = { -if (batchStats.value.isEmpty) { +if (batchStats.value == 0L) { // Underlying columnar RDD hasn't been materialized, no useful statistics information // available, return the default statistics. Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) } else { - // Underlying columnar RDD has been materialized, required information has also been - // collected via the `batchStats` accumulator. - val sizeOfRow: Expression = -BindReferences.bindReference( - output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), - partitionStatistics.schema) - - val sizeInBytes = -batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum - Statistics(sizeInBytes = sizeInBytes) + Statistics(sizeInBytes = batchStats.value.longValue) } } @@ -139,10 +127,10 @@ case class InMemoryRelation( rowCount += 1 } + batchStats.add(totalSize) +
spark git commit: [SPARK-17549][SQL] Only collect table size stat in driver for cached relation.
Repository: spark Updated Branches: refs/heads/master 068c198e9 -> 8d969a212 [SPARK-17549][SQL] Only collect table size stat in driver for cached relation. This reverts commit 9ac68dbc5720026ea92acc61d295ca64d0d3d132. Turns out the original fix was correct. Original change description: The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. Author: Marcelo VanzinCloses #15304 from vanzin/SPARK-17549.2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d969a21 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d969a21 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d969a21 Branch: refs/heads/master Commit: 8d969a2125d915da1506c17833aa98da614a257f Parents: 068c198 Author: Marcelo Vanzin Authored: Tue Oct 4 09:38:44 2016 -0700 Committer: Marcelo Vanzin Committed: Tue Oct 4 09:38:44 2016 -0700 -- .../execution/columnar/InMemoryRelation.scala | 24 +--- .../columnar/InMemoryColumnarQuerySuite.scala | 14 2 files changed, 20 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8d969a21/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 479934a..56bd5c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.columnar -import scala.collection.JavaConverters._ - import org.apache.commons.lang3.StringUtils import org.apache.spark.network.util.JavaUtils @@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.CollectionAccumulator +import org.apache.spark.util.LongAccumulator object InMemoryRelation { @@ -63,8 +61,7 @@ case class InMemoryRelation( @transient child: SparkPlan, tableName: Option[String])( @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, -val batchStats: CollectionAccumulator[InternalRow] = - child.sqlContext.sparkContext.collectionAccumulator[InternalRow]) +val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator) extends logical.LeafNode with MultiInstanceRelation { override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child) @@ -74,21 +71,12 @@ case class InMemoryRelation( @transient val partitionStatistics = new PartitionStatistics(output) override lazy val statistics: Statistics = { -if (batchStats.value.isEmpty) { +if (batchStats.value == 0L) { // Underlying columnar RDD hasn't been materialized, no useful statistics information // available, return the default statistics. Statistics(sizeInBytes = child.sqlContext.conf.defaultSizeInBytes) } else { - // Underlying columnar RDD has been materialized, required information has also been - // collected via the `batchStats` accumulator. - val sizeOfRow: Expression = -BindReferences.bindReference( - output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), - partitionStatistics.schema) - - val sizeInBytes = -batchStats.value.asScala.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum - Statistics(sizeInBytes = sizeInBytes) + Statistics(sizeInBytes = batchStats.value.longValue) } } @@ -139,10 +127,10 @@ case class InMemoryRelation( rowCount += 1 } + batchStats.add(totalSize) + val stats = InternalRow.fromSeq(columnBuilders.map(_.columnStats.collectedStatistics) .flatMap(_.values)) -
spark git commit: [SPARK-17549][SQL] Only collect table size stat in driver for cached relation.
Repository: spark Updated Branches: refs/heads/master b9323fc93 -> 39e2bad6a [SPARK-17549][SQL] Only collect table size stat in driver for cached relation. The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. On a mildly related change, I'm also adding code to catch exceptions in the code generator since Janino was breaking with the test data I tried this patch on. Tested with unit tests and by doing a count a very wide table (20k columns) with many partitions. Author: Marcelo VanzinCloses #15112 from vanzin/SPARK-17549. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39e2bad6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39e2bad6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39e2bad6 Branch: refs/heads/master Commit: 39e2bad6a866d27c3ca594d15e574a1da3ee84cc Parents: b9323fc Author: Marcelo Vanzin Authored: Fri Sep 16 14:02:56 2016 -0700 Committer: Yin Huai Committed: Fri Sep 16 14:02:56 2016 -0700 -- .../expressions/codegen/CodeGenerator.scala | 18 ++- .../execution/columnar/InMemoryRelation.scala | 24 +--- .../columnar/InMemoryColumnarQuerySuite.scala | 14 3 files changed, 32 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/39e2bad6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index f982c22..33b9b80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -23,6 +23,7 @@ import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal import com.google.common.cache.{CacheBuilder, CacheLoader} import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} @@ -910,14 +911,19 @@ object CodeGenerator extends Logging { codeAttrField.setAccessible(true) classes.foreach { case (_, classBytes) => CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length) - val cf = new ClassFile(new ByteArrayInputStream(classBytes)) - cf.methodInfos.asScala.foreach { method => -method.getAttributes().foreach { a => - if (a.getClass.getName == codeAttr.getName) { -CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update( - codeAttrField.get(a).asInstanceOf[Array[Byte]].length) + try { +val cf = new ClassFile(new ByteArrayInputStream(classBytes)) +cf.methodInfos.asScala.foreach { method => + method.getAttributes().foreach { a => +if (a.getClass.getName == codeAttr.getName) { + CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update( +codeAttrField.get(a).asInstanceOf[Array[Byte]].length) +} } } + } catch { +case NonFatal(e) => + logWarning("Error calculating stats of compiled class.", e) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/39e2bad6/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 479934a..56bd5c1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -17,8 +17,6 @@
spark git commit: [SPARK-17549][SQL] Only collect table size stat in driver for cached relation.
Repository: spark Updated Branches: refs/heads/branch-2.0 5ad4395e1 -> 3fce1255a [SPARK-17549][SQL] Only collect table size stat in driver for cached relation. The existing code caches all stats for all columns for each partition in the driver; for a large relation, this causes extreme memory usage, which leads to gc hell and application failures. It seems that only the size in bytes of the data is actually used in the driver, so instead just colllect that. In executors, the full stats are still kept, but that's not a big problem; we expect the data to be distributed and thus not really incur in too much memory pressure in each individual executor. There are also potential improvements on the executor side, since the data being stored currently is very wasteful (e.g. storing boxed types vs. primitive types for stats). But that's a separate issue. On a mildly related change, I'm also adding code to catch exceptions in the code generator since Janino was breaking with the test data I tried this patch on. Tested with unit tests and by doing a count a very wide table (20k columns) with many partitions. Author: Marcelo VanzinCloses #15112 from vanzin/SPARK-17549. (cherry picked from commit 39e2bad6a866d27c3ca594d15e574a1da3ee84cc) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fce1255 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fce1255 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fce1255 Branch: refs/heads/branch-2.0 Commit: 3fce1255ad41a04e92720795ce2b162ec305cf0a Parents: 5ad4395e Author: Marcelo Vanzin Authored: Fri Sep 16 14:02:56 2016 -0700 Committer: Yin Huai Committed: Fri Sep 16 14:03:08 2016 -0700 -- .../expressions/codegen/CodeGenerator.scala | 18 ++- .../execution/columnar/InMemoryRelation.scala | 24 +--- .../columnar/InMemoryColumnarQuerySuite.scala | 14 3 files changed, 32 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3fce1255/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala -- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 4bd9ee0..929f2da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -23,6 +23,7 @@ import java.util.{Map => JavaMap} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal import com.google.common.cache.{CacheBuilder, CacheLoader} import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, SimpleCompiler} @@ -914,14 +915,19 @@ object CodeGenerator extends Logging { codeAttrField.setAccessible(true) classes.foreach { case (_, classBytes) => CodegenMetrics.METRIC_GENERATED_CLASS_BYTECODE_SIZE.update(classBytes.length) - val cf = new ClassFile(new ByteArrayInputStream(classBytes)) - cf.methodInfos.asScala.foreach { method => -method.getAttributes().foreach { a => - if (a.getClass.getName == codeAttr.getName) { -CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update( - codeAttrField.get(a).asInstanceOf[Array[Byte]].length) + try { +val cf = new ClassFile(new ByteArrayInputStream(classBytes)) +cf.methodInfos.asScala.foreach { method => + method.getAttributes().foreach { a => +if (a.getClass.getName == codeAttr.getName) { + CodegenMetrics.METRIC_GENERATED_METHOD_BYTECODE_SIZE.update( +codeAttrField.get(a).asInstanceOf[Array[Byte]].length) +} } } + } catch { +case NonFatal(e) => + logWarning("Error calculating stats of compiled class.", e) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/3fce1255/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 479934a..56bd5c1 100644 ---