Here is a unit test that will OOM a 10G heap -------------- import java.sql.Timestamp
import org.apache.spark.sql.SparkSession import org.junit.Test import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import scala.collection.mutable.ArrayBuffer /** * A Small Unit Test to demonstrate Spark Window Functions OOM */ class SparkTest { @Test def testWindows() { val sparkSession = SparkSession.builder().master("local[7]").appName("tests").getOrCreate() import sparkSession.implicits._ println("Init Dataset") val partitions = (0 until 4) val entries = (0 until 6500) //val windows = (5 to 15 by 5) //Works val windows = (5 to 65 by 5) //OOM 10G val testData = new ArrayBuffer[(String,Timestamp,Double)] for( p <- partitions) { for( e <- entries ) yield { testData += (("Key"+p,new Timestamp(60000*e),e*2.0)) } } val ds = testData.toDF("key","datetime","value") ds.show() var resultFrame = ds resultFrame.schema.fields.foreach(println) val baseWin = Window.partitionBy("key").orderBy("datetime") for( win <- windows ) { resultFrame = resultFrame.withColumn("avg"+(-win),avg("value").over(baseWin.rowsBetween(-win,0))) .withColumn("stddev"+(-win),stddev("value").over(baseWin.rowsBetween(-win,0))) .withColumn("min"+(-win),min("value").over(baseWin.rowsBetween(-win,0))) .withColumn("max"+(-win),max("value").over(baseWin.rowsBetween(-win,0))) } resultFrame.show() } } > On Sep 20, 2016, at 10:26 PM, Jeremy Davis <jerda...@speakeasy.net> wrote: > > Hello all, > I ran in to a weird OOM issue when using the sliding windows. (Spark 2.0, > Scala 2.11.7, Java 1.8.0_11, OSX 10.10.5) > I’m using the Dataframe API and calling various: > Window.partitionBy(...).orderBy(...).rowsBetween(…) etc. > with just 4 types of aggregations:(avg,min,max,stddev), … avg.over(window) > etc.. > All parameterized over several window sizes > (-3,-4,-5,-8,-9,-13,-18,-20,-21,-34,-55) > -3 meaning (-3,0) > > I’m using a Timestamp as the order column, and aggregating over a Double. > In a given partition, there are only around 6500 samples of data that are > being aggregated. For some reason I hit some sort of non-linear memory use > around 5000 samples per partition (Perhaps doubling an array somewhere?). > I shrank the dataset down to just 4 partitions, but I still OOM a 10G heap > while running in Local Mode. It all seems odd when my input is on the order > of a just a few MB for this test case. > > I’m wondering if this sounds like expected behavior? Seems like my use case > is reasonable. > > Also, it will OOM when I run multithreaded with 2 threads and up, but seems > to work single threaded (“local[1]”). > > I will try to put together a simple repro case tomorrow. > > > Attached is a Yourkit Screen Shot. I suspect the long[] arrays double once > more before OOM. > > <windowfuncs.jpg>
signature.asc
Description: Message signed with OpenPGP using GPGMail