Repository: spark
Updated Branches:
  refs/heads/master da303526e -> bfcd528d6


[SPARK-6030] [CORE] Using simulated field layout method to compute class 
shellSize

SizeEstimator gives wrong result for Integer on 64bit JVM with 
UseCompressedOops on, this pr fixes that. For more details, please refer 
[SPARK-6030](https://issues.apache.org/jira/browse/SPARK-6030)
sryza, I noticed there is a pr to expose SizeEstimator, maybe that should be 
waited by this pr get merged if we confirm this problem.
And shivaram would you mind to review this pr since you contribute related 
code. Also cc to srowen and mateiz

Author: Ye Xianjin <advance...@gmail.com>

Closes #4783 from advancedxy/SPARK-6030 and squashes the following commits:

c4dcb41 [Ye Xianjin] Add super.beforeEach in the beforeEach method to make the 
trait stackable.. Remove useless leading whitespace.
3f80640 [Ye Xianjin] The size of Integer class changes from 24 to 16 on a 
64-bit JVM with -UseCompressedOops flag on after the fix. I don't how 100000 
was originally calculated, It looks like 100000 is the magic number which makes 
sure spilling. Because of the size change, It fails because there is no 
spilling at all. Change the number to a slightly larger number fixes that.
e849d2d [Ye Xianjin] Merge two shellSize assignments into one. Add some 
explanation to alignSizeUp method.
85a0b51 [Ye Xianjin] Fix typos and update wording in comments. Using 
alignSizeUp to compute alignSize.
d27eb77 [Ye Xianjin] Add some detailed comments in the code. Add some test 
cases. It's very difficult to design test cases as the final object alignment 
will hide a lot of filed layout details if we just considering the whole size.
842aed1 [Ye Xianjin] primitiveSize(cls) can just return Int. Use a simplified 
class field layout method to calculate class instance size. Will add more 
documents and test cases. Add a new alignSizeUp function which uses bitwise 
operators to speedup.
62e8ab4 [Ye Xianjin] Don't alignSize for objects' shellSize, alignSize when 
added to state.size. Add some primitive wrapper objects size tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bfcd528d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bfcd528d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bfcd528d

Branch: refs/heads/master
Commit: bfcd528d6f5a5ebe61e0fcca890143e9a3c7f7f9
Parents: da30352
Author: Ye Xianjin <advance...@gmail.com>
Authored: Sat May 2 23:08:09 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Sat May 2 23:08:09 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/util/SizeEstimator.scala   | 61 ++++++++++++++++----
 .../apache/spark/util/SizeEstimatorSuite.scala  | 47 ++++++++++++++-
 .../util/collection/ExternalSorterSuite.scala   | 10 ++--
 3 files changed, 100 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bfcd528d/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala 
b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 4dd7ab9..d91c329 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -47,6 +47,11 @@ private[spark] object SizeEstimator extends Logging {
   private val FLOAT_SIZE   = 4
   private val DOUBLE_SIZE  = 8
 
+  // Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be 
pointers. The size of
+  // a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and 
UseCompressedOops flag.
+  // The sizes should be in descending order, as we will use that information 
for fields placement.
+  private val fieldSizes = List(8, 4, 2, 1)
+
   // Alignment boundary for objects
   // TODO: Is this arch dependent ?
   private val ALIGN_SIZE = 8
@@ -171,7 +176,7 @@ private[spark] object SizeEstimator extends Logging {
       // general all ClassLoaders and Classes will be shared between objects 
anyway.
     } else {
       val classInfo = getClassInfo(cls)
-      state.size += classInfo.shellSize
+      state.size += alignSize(classInfo.shellSize)
       for (field <- classInfo.pointerFields) {
         state.enqueue(field.get(obj))
       }
@@ -237,8 +242,8 @@ private[spark] object SizeEstimator extends Logging {
     }
     size
   }
-  
-  private def primitiveSize(cls: Class[_]): Long = {
+
+  private def primitiveSize(cls: Class[_]): Int = {
     if (cls == classOf[Byte]) {
       BYTE_SIZE
     } else if (cls == classOf[Boolean]) {
@@ -274,21 +279,50 @@ private[spark] object SizeEstimator extends Logging {
     val parent = getClassInfo(cls.getSuperclass)
     var shellSize = parent.shellSize
     var pointerFields = parent.pointerFields
+    val sizeCount = Array.fill(fieldSizes.max + 1)(0)
 
+    // iterate through the fields of this class and gather information.
     for (field <- cls.getDeclaredFields) {
       if (!Modifier.isStatic(field.getModifiers)) {
         val fieldClass = field.getType
         if (fieldClass.isPrimitive) {
-          shellSize += primitiveSize(fieldClass)
+          sizeCount(primitiveSize(fieldClass)) += 1
         } else {
           field.setAccessible(true) // Enable future get()'s on this field
-          shellSize += pointerSize
+          sizeCount(pointerSize) += 1
           pointerFields = field :: pointerFields
         }
       }
     }
 
-    shellSize = alignSize(shellSize)
+    // Based on the simulated field layout code in Aleksey Shipilev's report:
+    // 
http://cr.openjdk.java.net/~shade/papers/2013-shipilev-fieldlayout-latest.pdf
+    // The code is in Figure 9.
+    // The simplified idea of field layout consists of 4 parts (see more 
details in the report):
+    //
+    // 1. field alignment: HotSpot lays out the fields aligned by their size.
+    // 2. object alignment: HotSpot rounds instance size up to 8 bytes
+    // 3. consistent fields layouts throughout the hierarchy: This means we 
should layout
+    // superclass first. And we can use superclass's shellSize as a starting 
point to layout the
+    // other fields in this class.
+    // 4. class alignment: HotSpot rounds field blocks up to to HeapOopSize 
not 4 bytes, confirmed
+    // with Aleksey. see https://bugs.openjdk.java.net/browse/CODETOOLS-7901322
+    //
+    // The real world field layout is much more complicated. There are three 
kinds of fields
+    // order in Java 8. And we don't consider the @contended annotation 
introduced by Java 8.
+    // see the HotSpot classloader code, layout_fields method for more details.
+    // 
hg.openjdk.java.net/jdk8/jdk8/hotspot/file/tip/src/share/vm/classfile/classFileParser.cpp
+    var alignedSize = shellSize
+    for (size <- fieldSizes if sizeCount(size) > 0) {
+      val count = sizeCount(size)
+      // If there are internal gaps, smaller field can fit in.
+      alignedSize = math.max(alignedSize, alignSizeUp(shellSize, size) + size 
* count)
+      shellSize += size * count
+    }
+
+    // Should choose a larger size to be new shellSize and clearly alignedSize 
>= shellSize, and
+    // round up the instance filed blocks
+    shellSize = alignSizeUp(alignedSize, pointerSize)
 
     // Create and cache a new ClassInfo
     val newInfo = new ClassInfo(shellSize, pointerFields)
@@ -296,8 +330,15 @@ private[spark] object SizeEstimator extends Logging {
     newInfo
   }
 
-  private def alignSize(size: Long): Long = {
-    val rem = size % ALIGN_SIZE
-    if (rem == 0) size else (size + ALIGN_SIZE - rem)
-  }
+  private def alignSize(size: Long): Long = alignSizeUp(size, ALIGN_SIZE)
+
+  /**
+   * Compute aligned size. The alignSize must be 2^n, otherwise the result 
will be wrong.
+   * When alignSize = 2^n, alignSize - 1 = 2^n - 1. The binary representation 
of (alignSize - 1)
+   * will only have n trailing 1s(0b00...001..1). ~(alignSize - 1) will be 
0b11..110..0. Hence,
+   * (size + alignSize - 1) & ~(alignSize - 1) will set the last n bits to 
zeros, which leads to
+   * multiple of alignSize.
+   */
+  private def alignSizeUp(size: Long, alignSize: Int): Long =
+    (size + alignSize - 1) & ~(alignSize - 1)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfcd528d/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
index 28915bd..133a76f 100644
--- a/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/SizeEstimatorSuite.scala
@@ -36,6 +36,15 @@ class DummyClass4(val d: DummyClass3) {
   val x: Int = 0
 }
 
+// dummy class to show class field blocks alignment.
+class DummyClass5 extends DummyClass1 {
+  val x: Boolean = true
+}
+
+class DummyClass6 extends DummyClass5 {
+  val y: Boolean = true
+}
+
 object DummyString {
   def apply(str: String) : DummyString = new DummyString(str.toArray)
 }
@@ -50,6 +59,7 @@ class SizeEstimatorSuite
 
   override def beforeEach() {
     // Set the arch to 64-bit and compressedOops to true to get a 
deterministic test-case
+    super.beforeEach()
     System.setProperty("os.arch", "amd64")
     System.setProperty("spark.test.useCompressedOops", "true")
   }
@@ -62,6 +72,22 @@ class SizeEstimatorSuite
     assertResult(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
   }
 
+  test("primitive wrapper objects") {
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Boolean(true)))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Byte("1")))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Character('1')))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Short("1")))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Integer(1)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
+    assertResult(16)(SizeEstimator.estimate(new java.lang.Float(1.0)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
+  }
+
+  test("class field blocks rounding") {
+    assertResult(16)(SizeEstimator.estimate(new DummyClass5))
+    assertResult(24)(SizeEstimator.estimate(new DummyClass6))
+  }
+
   // NOTE: The String class definition varies across JDK versions (1.6 vs. 
1.7) and vendors
   // (Sun vs IBM). Use a DummyString class to make tests deterministic.
   test("strings") {
@@ -102,18 +128,18 @@ class SizeEstimatorSuite
     val arr = new Array[Char](100000)
     assertResult(200016)(SizeEstimator.estimate(arr))
     assertResult(480032)(SizeEstimator.estimate(Array.fill(10000)(new 
DummyString(arr))))
-    
+
     val buf = new ArrayBuffer[DummyString]()
     for (i <- 0 until 5000) {
       buf.append(new DummyString(new Array[Char](10)))
     }
     assertResult(340016)(SizeEstimator.estimate(buf.toArray))
-    
+
     for (i <- 0 until 5000) {
       buf.append(new DummyString(arr))
     }
     assertResult(683912)(SizeEstimator.estimate(buf.toArray))
-    
+
     // If an array contains the *same* element many times, we should only 
count it once.
     val d1 = new DummyClass1
     // 10 pointers plus 8-byte object
@@ -155,5 +181,20 @@ class SizeEstimatorSuite
     assertResult(64)(SizeEstimator.estimate(DummyString("a")))
     assertResult(64)(SizeEstimator.estimate(DummyString("ab")))
     assertResult(72)(SizeEstimator.estimate(DummyString("abcdefgh")))
+
+    // primitive wrapper classes
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Boolean(true)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Byte("1")))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Character('1')))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Short("1")))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Integer(1)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Long(1)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Float(1.0)))
+    assertResult(24)(SizeEstimator.estimate(new java.lang.Double(1.0d)))
+  }
+
+  test("class field blocks rounding on 64-bit VM without useCompressedOops") {
+    assertResult(24)(SizeEstimator.estimate(new DummyClass5))
+    assertResult(32)(SizeEstimator.estimate(new DummyClass6))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/bfcd528d/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 20fd22b..7a98723 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -377,7 +377,7 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext with PrivateMe
     val sorter = new ExternalSorter[Int, Int, Int](
       None, Some(new HashPartitioner(3)), Some(ord), None)
     assertDidNotBypassMergeSort(sorter)
-    sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
+    sorter.insertAll((0 until 120000).iterator.map(i => (i, i)))
     assert(diskBlockManager.getAllFiles().length > 0)
     sorter.stop()
     assert(diskBlockManager.getAllBlocks().length === 0)
@@ -385,9 +385,9 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext with PrivateMe
     val sorter2 = new ExternalSorter[Int, Int, Int](
       None, Some(new HashPartitioner(3)), Some(ord), None)
     assertDidNotBypassMergeSort(sorter2)
-    sorter2.insertAll((0 until 100000).iterator.map(i => (i, i)))
+    sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)))
     assert(diskBlockManager.getAllFiles().length > 0)
-    assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet)
+    assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet)
     sorter2.stop()
     assert(diskBlockManager.getAllBlocks().length === 0)
   }
@@ -428,8 +428,8 @@ class ExternalSorterSuite extends FunSuite with 
LocalSparkContext with PrivateMe
       None, Some(new HashPartitioner(3)), Some(ord), None)
     assertDidNotBypassMergeSort(sorter)
     intercept[SparkException] {
-      sorter.insertAll((0 until 100000).iterator.map(i => {
-        if (i == 99990) {
+      sorter.insertAll((0 until 120000).iterator.map(i => {
+        if (i == 119990) {
           throw new SparkException("Intentional failure")
         }
         (i, i)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to