Here is a unit test that will OOM a 10G heap
--------------

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.junit.Test
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

import scala.collection.mutable.ArrayBuffer


/**
 * A Small Unit Test to demonstrate Spark Window Functions OOM
 */
class SparkTest {


  @Test
  def testWindows() {
    val sparkSession = 
SparkSession.builder().master("local[7]").appName("tests").getOrCreate()
    import sparkSession.implicits._

    println("Init Dataset")

    val partitions = (0 until 4)
    val entries = (0 until 6500)

    //val windows = (5 to 15 by 5) //Works
    val windows = (5 to 65 by 5)   //OOM 10G

    val testData = new ArrayBuffer[(String,Timestamp,Double)]


    for( p <- partitions) {
      for( e <- entries ) yield {
        testData += (("Key"+p,new Timestamp(60000*e),e*2.0))
      }
    }

    val ds = testData.toDF("key","datetime","value")
    ds.show()


    var resultFrame = ds
    resultFrame.schema.fields.foreach(println)


    val baseWin = Window.partitionBy("key").orderBy("datetime")
    for( win <- windows ) {
      resultFrame = 
resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0)))
            
.withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0)))
            
.withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0)))
            
.withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0)))
    }
    resultFrame.show()

  }

}



> On Sep 20, 2016, at 10:26 PM, Jeremy Davis <jerda...@speakeasy.net> wrote:
> 
>  Hello all,
> I ran in to a weird OOM issue when using the sliding windows. (Spark 2.0, 
> Scala 2.11.7, Java 1.8.0_11, OSX 10.10.5)
> I’m using the Dataframe API and calling various:
> Window.partitionBy(...).orderBy(...).rowsBetween(…) etc.
> with just 4 types of aggregations:(avg,min,max,stddev),  … avg.over(window)  
> etc..
> All parameterized over several window sizes 
> (-3,-4,-5,-8,-9,-13,-18,-20,-21,-34,-55)
> -3 meaning (-3,0)
> 
> I’m using a Timestamp as the order column, and aggregating over a Double.
> In a given partition, there are only around 6500 samples of data that are 
> being aggregated. For some reason I hit some sort of non-linear memory use 
> around 5000 samples per partition (Perhaps doubling an array somewhere?).
> I shrank the dataset down to just 4 partitions, but I still OOM a 10G heap 
> while running in Local Mode. It all seems odd when my input is on the order 
> of a just a few MB for this test case.
> 
> I’m wondering if this sounds like expected behavior? Seems like my use case 
> is reasonable.
> 
> Also, it will OOM when I run multithreaded with 2 threads and up, but seems 
> to work single threaded  (“local[1]”).
> 
> I will try to put together a simple repro case tomorrow.
> 
> 
> Attached is a Yourkit Screen Shot. I suspect the long[] arrays double once 
> more before OOM.
> 
> <windowfuncs.jpg>

Attachment: signature.asc
Description: Message signed with OpenPGP using GPGMail

Reply via email to