Re: pyspark streaming DStream compute
On Tue, Sep 15, 2015 at 1:46 PM, Renyi Xiongwrote: > Can anybody help understand why pyspark streaming uses py4j callback to > execute python code while pyspark batch uses worker.py? There are two kind of callback in pyspark streaming: 1) one operate on RDDs, it take an RDD and return an new RDD, uses py4j callback, because SparkContext and RDDs are not accessible in worker.py 2) operate on records of RDD, it take an record and return new records, uses worker.py > regarding pyspark streaming, is py4j callback only used for DStream, > worker.py still used for RDD? Yes. > thanks, > Renyi. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Unable to acquire memory errors in HiveCompatibilitySuite
Oops... I meant to say "The page size calculation is NOT the issue here" On 16 September 2015 at 06:46, Pete Robbinswrote: > The page size calculation is the issue here as there is plenty of free > memory, although there is maybe a fair bit of wasted space in some pages. > It is that when we have a lot of tasks each is only allowed to reach 1/n of > the available memory and several of the tasks bump in to that limit. With > tasks 4 times the number of cores there will be some contention and so they > remain active for longer. > > So I think this is a test case issue configuring the number of executors > too high. > > On 15 September 2015 at 18:54, Reynold Xin wrote: > >> Maybe we can change the heuristics in memory calculation to use >> SparkContext.defaultParallelism if it is local mode. >> >> >> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins >> wrote: >> >>> Yes and at least there is an override by setting spark.sql.test.master >>> to local[8] , in fact local[16] worked on my 8 core box. >>> >>> I'm happy to use this as a workaround but the 32 hard-coded will fail >>> running build/tests on a clean checkout if you only have 8 cores. >>> >>> On 15 September 2015 at 17:40, Marcelo Vanzin >>> wrote: >>> That test explicitly sets the number of executor cores to 32. object TestHive extends TestHiveContext( new SparkContext( System.getProperty("spark.sql.test.master", "local[32]"), On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin wrote: > Yea I think this is where the heuristics is failing -- it uses 8 cores to > approximate the number of active tasks, but the tests somehow is using 32 > (maybe because it explicitly sets it to that, or you set it yourself? I'm > not sure which one) > > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins wrote: >> >> Reynold, thanks for replying. >> >> getPageSize parameters: maxMemory=515396075, numCores=0 >> Calculated values: cores=8, default=4194304 >> >> So am I getting a large page size as I only have 8 cores? >> >> On 15 September 2015 at 00:40, Reynold Xin wrote: >>> >>> Pete - can you do me a favor? >>> >>> >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>> >>> Print the parameters that are passed into the getPageSize function, and >>> check their values. >>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin wrote: Is this on latest master / branch-1.5? out of the box we reserve only 16% (0.2 * 0.8) of the memory for execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at least one page for execution. If your page size is 4MB, it only takes 3 operators to use up its memory. The thing is page size is dynamically determined -- and in your case it should be smaller than 4MB. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 Maybe there is a place that in the maven tests that we explicitly set the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and just remove it. On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins wrote: > > I keep hitting errors running the tests on 1.5 such as > > > - join31 *** FAILED *** > Failed to execute query using catalyst: > Error: Job aborted due to stage failure: Task 9 in stage 3653.0 > failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID > 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes of > memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > > > This is using the command > build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test > > > I don't see these errors in any of the amplab jenkins builds. Do those > builds have any configuration/environment that I may be missing? My build is > running with whatever defaults are in the top level pom.xml, eg -Xmx3G. > > I can make these tests pass by setting spark.shuffle.memoryFraction=0.6 > in the
Re: Unable to acquire memory errors in HiveCompatibilitySuite
It is exactly the issue here, isn't it? We are using memory / N, where N should be the maximum number of active tasks. In the current master, we use the number of cores to approximate the number of tasks -- but it turned out to be a bad approximation in tests because it is set to 32 to increase concurrency. On Tue, Sep 15, 2015 at 10:47 PM, Pete Robbinswrote: > Oops... I meant to say "The page size calculation is NOT the issue here" > > On 16 September 2015 at 06:46, Pete Robbins wrote: > >> The page size calculation is the issue here as there is plenty of free >> memory, although there is maybe a fair bit of wasted space in some pages. >> It is that when we have a lot of tasks each is only allowed to reach 1/n of >> the available memory and several of the tasks bump in to that limit. With >> tasks 4 times the number of cores there will be some contention and so they >> remain active for longer. >> >> So I think this is a test case issue configuring the number of executors >> too high. >> >> On 15 September 2015 at 18:54, Reynold Xin wrote: >> >>> Maybe we can change the heuristics in memory calculation to use >>> SparkContext.defaultParallelism if it is local mode. >>> >>> >>> On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins >>> wrote: >>> Yes and at least there is an override by setting spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core box. I'm happy to use this as a workaround but the 32 hard-coded will fail running build/tests on a clean checkout if you only have 8 cores. On 15 September 2015 at 17:40, Marcelo Vanzin wrote: > That test explicitly sets the number of executor cores to 32. > > object TestHive > extends TestHiveContext( > new SparkContext( > System.getProperty("spark.sql.test.master", "local[32]"), > > > On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin > wrote: > > Yea I think this is where the heuristics is failing -- it uses 8 > cores to > > approximate the number of active tasks, but the tests somehow is > using 32 > > (maybe because it explicitly sets it to that, or you set it > yourself? I'm > > not sure which one) > > > > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins > wrote: > >> > >> Reynold, thanks for replying. > >> > >> getPageSize parameters: maxMemory=515396075, numCores=0 > >> Calculated values: cores=8, default=4194304 > >> > >> So am I getting a large page size as I only have 8 cores? > >> > >> On 15 September 2015 at 00:40, Reynold Xin > wrote: > >>> > >>> Pete - can you do me a favor? > >>> > >>> > >>> > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 > >>> > >>> Print the parameters that are passed into the getPageSize > function, and > >>> check their values. > >>> > >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin > wrote: > > Is this on latest master / branch-1.5? > > out of the box we reserve only 16% (0.2 * 0.8) of the memory for > execution (e.g. aggregate, join) / shuffle sorting. With a 3GB > heap, that's > 480MB. So each task gets 480MB / 32 = 15MB, and each operator > reserves at > least one page for execution. If your page size is 4MB, it only > takes 3 > operators to use up its memory. > > The thing is page size is dynamically determined -- and in your > case it > should be smaller than 4MB. > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 > > Maybe there is a place that in the maven tests that we explicitly > set > the page size (spark.buffer.pageSize) to 4MB? If yes, we need to > find it and > just remove it. > > > On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins < > robbin...@gmail.com> > wrote: > > > > I keep hitting errors running the tests on 1.5 such as > > > > > > - join31 *** FAILED *** > > Failed to execute query using catalyst: > > Error: Job aborted due to stage failure: Task 9 in stage 3653.0 > > failed 1 times, most recent failure: Lost task 9.0 in stage > 3653.0 (TID > > 123363, localhost): java.io.IOException: Unable to acquire > 4194304 bytes of > > memory > > at > > > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > > > > > > This is using
Re: Unable to acquire memory errors in HiveCompatibilitySuite
The page size calculation is the issue here as there is plenty of free memory, although there is maybe a fair bit of wasted space in some pages. It is that when we have a lot of tasks each is only allowed to reach 1/n of the available memory and several of the tasks bump in to that limit. With tasks 4 times the number of cores there will be some contention and so they remain active for longer. So I think this is a test case issue configuring the number of executors too high. On 15 September 2015 at 18:54, Reynold Xinwrote: > Maybe we can change the heuristics in memory calculation to use > SparkContext.defaultParallelism if it is local mode. > > > On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbins > wrote: > >> Yes and at least there is an override by setting spark.sql.test.master >> to local[8] , in fact local[16] worked on my 8 core box. >> >> I'm happy to use this as a workaround but the 32 hard-coded will fail >> running build/tests on a clean checkout if you only have 8 cores. >> >> On 15 September 2015 at 17:40, Marcelo Vanzin >> wrote: >> >>> That test explicitly sets the number of executor cores to 32. >>> >>> object TestHive >>> extends TestHiveContext( >>> new SparkContext( >>> System.getProperty("spark.sql.test.master", "local[32]"), >>> >>> >>> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin >>> wrote: >>> > Yea I think this is where the heuristics is failing -- it uses 8 cores >>> to >>> > approximate the number of active tasks, but the tests somehow is using >>> 32 >>> > (maybe because it explicitly sets it to that, or you set it yourself? >>> I'm >>> > not sure which one) >>> > >>> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins >>> wrote: >>> >> >>> >> Reynold, thanks for replying. >>> >> >>> >> getPageSize parameters: maxMemory=515396075, numCores=0 >>> >> Calculated values: cores=8, default=4194304 >>> >> >>> >> So am I getting a large page size as I only have 8 cores? >>> >> >>> >> On 15 September 2015 at 00:40, Reynold Xin >>> wrote: >>> >>> >>> >>> Pete - can you do me a favor? >>> >>> >>> >>> >>> >>> >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>> >>> >>> >>> Print the parameters that are passed into the getPageSize function, >>> and >>> >>> check their values. >>> >>> >>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin >>> wrote: >>> >>> Is this on latest master / branch-1.5? >>> >>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for >>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB >>> heap, that's >>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator >>> reserves at >>> least one page for execution. If your page size is 4MB, it only >>> takes 3 >>> operators to use up its memory. >>> >>> The thing is page size is dynamically determined -- and in your >>> case it >>> should be smaller than 4MB. >>> >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>> >>> Maybe there is a place that in the maven tests that we explicitly >>> set >>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to >>> find it and >>> just remove it. >>> >>> >>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins >>> wrote: >>> > >>> > I keep hitting errors running the tests on 1.5 such as >>> > >>> > >>> > - join31 *** FAILED *** >>> > Failed to execute query using catalyst: >>> > Error: Job aborted due to stage failure: Task 9 in stage 3653.0 >>> > failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 >>> (TID >>> > 123363, localhost): java.io.IOException: Unable to acquire 4194304 >>> bytes of >>> > memory >>> > at >>> > >>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) >>> > >>> > >>> > This is using the command >>> > build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test >>> > >>> > >>> > I don't see these errors in any of the amplab jenkins builds. Do >>> those >>> > builds have any configuration/environment that I may be missing? >>> My build is >>> > running with whatever defaults are in the top level pom.xml, eg >>> -Xmx3G. >>> > >>> > I can make these tests pass by setting >>> spark.shuffle.memoryFraction=0.6 >>> > in the HiveCompatibilitySuite rather than the default 0.2 value. >>> > >>> > Trying to analyze what is going on with the test it is related to >>> the >>> > number of active tasks, which seems to rise to 32, and so the >>> > ShuffleMemoryManager allows less memory per task even though most >>> of those
Re: Unable to acquire memory errors in HiveCompatibilitySuite
Reynold, thanks for replying. getPageSize parameters: maxMemory=515396075, numCores=0 Calculated values: cores=8, default=4194304 So am I getting a large page size as I only have 8 cores? On 15 September 2015 at 00:40, Reynold Xinwrote: > Pete - can you do me a favor? > > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 > > Print the parameters that are passed into the getPageSize function, and > check their values. > > On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin wrote: > >> Is this on latest master / branch-1.5? >> >> out of the box we reserve only 16% (0.2 * 0.8) of the memory for >> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's >> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at >> least one page for execution. If your page size is 4MB, it only takes 3 >> operators to use up its memory. >> >> The thing is page size is dynamically determined -- and in your case it >> should be smaller than 4MB. >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >> >> Maybe there is a place that in the maven tests that we explicitly set the >> page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and >> just remove it. >> >> >> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins >> wrote: >> >>> I keep hitting errors running the tests on 1.5 such as >>> >>> >>> - join31 *** FAILED *** >>> Failed to execute query using catalyst: >>> Error: Job aborted due to stage failure: Task 9 in stage 3653.0 failed >>> 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID 123363, >>> localhost): java.io.IOException: Unable to acquire 4194304 bytes of memory >>> at >>> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) >>> >>> >>> This is using the command >>> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test >>> >>> >>> I don't see these errors in any of the amplab jenkins builds. Do those >>> builds have any configuration/environment that I may be missing? My build >>> is running with whatever defaults are in the top level pom.xml, eg -Xmx3G. >>> >>> I can make these tests pass by setting spark.shuffle.memoryFraction=0.6 >>> in the HiveCompatibilitySuite rather than the default 0.2 value. >>> >>> Trying to analyze what is going on with the test it is related to the >>> number of active tasks, which seems to rise to 32, and so the >>> ShuffleMemoryManager allows less memory per task even though most of those >>> tasks do not have any memory allocated to them. >>> >>> Has anyone seen issues like this before? >>> >> >> >
Re: Unable to acquire memory errors in HiveCompatibilitySuite
Ok so it looks like the max number of active tasks reaches 30. I'm not setting anything as it is a clean environment with clean spark code checkout. I'll dig further to see why so many tasks are active. Cheers, On 15 September 2015 at 07:22, Reynold Xinwrote: > Yea I think this is where the heuristics is failing -- it uses 8 cores to > approximate the number of active tasks, but the tests somehow is using 32 > (maybe because it explicitly sets it to that, or you set it yourself? I'm > not sure which one) > > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins > wrote: > >> Reynold, thanks for replying. >> >> getPageSize parameters: maxMemory=515396075, numCores=0 >> Calculated values: cores=8, default=4194304 >> >> So am I getting a large page size as I only have 8 cores? >> >> On 15 September 2015 at 00:40, Reynold Xin wrote: >> >>> Pete - can you do me a favor? >>> >>> >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>> >>> Print the parameters that are passed into the getPageSize function, and >>> check their values. >>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin >>> wrote: >>> Is this on latest master / branch-1.5? out of the box we reserve only 16% (0.2 * 0.8) of the memory for execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at least one page for execution. If your page size is 4MB, it only takes 3 operators to use up its memory. The thing is page size is dynamically determined -- and in your case it should be smaller than 4MB. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 Maybe there is a place that in the maven tests that we explicitly set the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and just remove it. On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins wrote: > I keep hitting errors running the tests on 1.5 such as > > > - join31 *** FAILED *** > Failed to execute query using catalyst: > Error: Job aborted due to stage failure: Task 9 in stage 3653.0 > failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID > 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes > of > memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > > > This is using the command > build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test > > > I don't see these errors in any of the amplab jenkins builds. Do those > builds have any configuration/environment that I may be missing? My build > is running with whatever defaults are in the top level pom.xml, eg -Xmx3G. > > I can make these tests pass by setting > spark.shuffle.memoryFraction=0.6 in the HiveCompatibilitySuite rather than > the default 0.2 value. > > Trying to analyze what is going on with the test it is related to the > number of active tasks, which seems to rise to 32, and so the > ShuffleMemoryManager allows less memory per task even though most of those > tasks do not have any memory allocated to them. > > Has anyone seen issues like this before? > >>> >> >
Re: Unable to acquire memory errors in HiveCompatibilitySuite
Yea I think this is where the heuristics is failing -- it uses 8 cores to approximate the number of active tasks, but the tests somehow is using 32 (maybe because it explicitly sets it to that, or you set it yourself? I'm not sure which one) On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbinswrote: > Reynold, thanks for replying. > > getPageSize parameters: maxMemory=515396075, numCores=0 > Calculated values: cores=8, default=4194304 > > So am I getting a large page size as I only have 8 cores? > > On 15 September 2015 at 00:40, Reynold Xin wrote: > >> Pete - can you do me a favor? >> >> >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >> >> Print the parameters that are passed into the getPageSize function, and >> check their values. >> >> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin wrote: >> >>> Is this on latest master / branch-1.5? >>> >>> out of the box we reserve only 16% (0.2 * 0.8) of the memory for >>> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's >>> 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at >>> least one page for execution. If your page size is 4MB, it only takes 3 >>> operators to use up its memory. >>> >>> The thing is page size is dynamically determined -- and in your case it >>> should be smaller than 4MB. >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>> >>> Maybe there is a place that in the maven tests that we explicitly set >>> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it >>> and just remove it. >>> >>> >>> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins >>> wrote: >>> I keep hitting errors running the tests on 1.5 such as - join31 *** FAILED *** Failed to execute query using catalyst: Error: Job aborted due to stage failure: Task 9 in stage 3653.0 failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) This is using the command build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test I don't see these errors in any of the amplab jenkins builds. Do those builds have any configuration/environment that I may be missing? My build is running with whatever defaults are in the top level pom.xml, eg -Xmx3G. I can make these tests pass by setting spark.shuffle.memoryFraction=0.6 in the HiveCompatibilitySuite rather than the default 0.2 value. Trying to analyze what is going on with the test it is related to the number of active tasks, which seems to rise to 32, and so the ShuffleMemoryManager allows less memory per task even though most of those tasks do not have any memory allocated to them. Has anyone seen issues like this before? >>> >>> >> >
Re: Unable to acquire memory errors in HiveCompatibilitySuite
Yes and at least there is an override by setting spark.sql.test.master to local[8] , in fact local[16] worked on my 8 core box. I'm happy to use this as a workaround but the 32 hard-coded will fail running build/tests on a clean checkout if you only have 8 cores. On 15 September 2015 at 17:40, Marcelo Vanzinwrote: > That test explicitly sets the number of executor cores to 32. > > object TestHive > extends TestHiveContext( > new SparkContext( > System.getProperty("spark.sql.test.master", "local[32]"), > > > On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin wrote: > > Yea I think this is where the heuristics is failing -- it uses 8 cores to > > approximate the number of active tasks, but the tests somehow is using 32 > > (maybe because it explicitly sets it to that, or you set it yourself? I'm > > not sure which one) > > > > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins > wrote: > >> > >> Reynold, thanks for replying. > >> > >> getPageSize parameters: maxMemory=515396075, numCores=0 > >> Calculated values: cores=8, default=4194304 > >> > >> So am I getting a large page size as I only have 8 cores? > >> > >> On 15 September 2015 at 00:40, Reynold Xin wrote: > >>> > >>> Pete - can you do me a favor? > >>> > >>> > >>> > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 > >>> > >>> Print the parameters that are passed into the getPageSize function, and > >>> check their values. > >>> > >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin > wrote: > > Is this on latest master / branch-1.5? > > out of the box we reserve only 16% (0.2 * 0.8) of the memory for > execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, > that's > 480MB. So each task gets 480MB / 32 = 15MB, and each operator > reserves at > least one page for execution. If your page size is 4MB, it only takes > 3 > operators to use up its memory. > > The thing is page size is dynamically determined -- and in your case > it > should be smaller than 4MB. > > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 > > Maybe there is a place that in the maven tests that we explicitly set > the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find > it and > just remove it. > > > On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins > wrote: > > > > I keep hitting errors running the tests on 1.5 such as > > > > > > - join31 *** FAILED *** > > Failed to execute query using catalyst: > > Error: Job aborted due to stage failure: Task 9 in stage 3653.0 > > failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 > (TID > > 123363, localhost): java.io.IOException: Unable to acquire 4194304 > bytes of > > memory > > at > > > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > > > > > > This is using the command > > build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test > > > > > > I don't see these errors in any of the amplab jenkins builds. Do > those > > builds have any configuration/environment that I may be missing? My > build is > > running with whatever defaults are in the top level pom.xml, eg > -Xmx3G. > > > > I can make these tests pass by setting > spark.shuffle.memoryFraction=0.6 > > in the HiveCompatibilitySuite rather than the default 0.2 value. > > > > Trying to analyze what is going on with the test it is related to the > > number of active tasks, which seems to rise to 32, and so the > > ShuffleMemoryManager allows less memory per task even though most of > those > > tasks do not have any memory allocated to them. > > > > Has anyone seen issues like this before? > > > >>> > >> > > > > > > -- > Marcelo >
Predicate push-down bug?
Turning on predicate pushdown for ORC datasources results in a NoSuchElementException: scala> val df = sqlContext.sql("SELECT name FROM people WHERE age < 15") df: org.apache.spark.sql.DataFrame = [name: string] scala> sqlContext.setConf("spark.sql.orc.filterPushdown", "*true*") scala> df.explain == Physical Plan == *java.util.NoSuchElementException* Disabling the pushdown makes things work again: scala> sqlContext.setConf("spark.sql.orc.filterPushdown", "*false*") scala> df.explain == Physical Plan == Project [name#6] Filter (age#7 < 15) Scan OrcRelation[file:/home/mydir/spark-1.5.0-SNAPSHOT/test/people][name#6,age#7] Have any of you run into this problem before? Is a fix available? Thanks, Ravi
Re: Unable to acquire memory errors in HiveCompatibilitySuite
Maybe we can change the heuristics in memory calculation to use SparkContext.defaultParallelism if it is local mode. On Tue, Sep 15, 2015 at 10:28 AM, Pete Robbinswrote: > Yes and at least there is an override by setting spark.sql.test.master to > local[8] , in fact local[16] worked on my 8 core box. > > I'm happy to use this as a workaround but the 32 hard-coded will fail > running build/tests on a clean checkout if you only have 8 cores. > > On 15 September 2015 at 17:40, Marcelo Vanzin wrote: > >> That test explicitly sets the number of executor cores to 32. >> >> object TestHive >> extends TestHiveContext( >> new SparkContext( >> System.getProperty("spark.sql.test.master", "local[32]"), >> >> >> On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xin >> wrote: >> > Yea I think this is where the heuristics is failing -- it uses 8 cores >> to >> > approximate the number of active tasks, but the tests somehow is using >> 32 >> > (maybe because it explicitly sets it to that, or you set it yourself? >> I'm >> > not sure which one) >> > >> > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins >> wrote: >> >> >> >> Reynold, thanks for replying. >> >> >> >> getPageSize parameters: maxMemory=515396075, numCores=0 >> >> Calculated values: cores=8, default=4194304 >> >> >> >> So am I getting a large page size as I only have 8 cores? >> >> >> >> On 15 September 2015 at 00:40, Reynold Xin >> wrote: >> >>> >> >>> Pete - can you do me a favor? >> >>> >> >>> >> >>> >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >> >>> >> >>> Print the parameters that are passed into the getPageSize function, >> and >> >>> check their values. >> >>> >> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin >> wrote: >> >> Is this on latest master / branch-1.5? >> >> out of the box we reserve only 16% (0.2 * 0.8) of the memory for >> execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, >> that's >> 480MB. So each task gets 480MB / 32 = 15MB, and each operator >> reserves at >> least one page for execution. If your page size is 4MB, it only >> takes 3 >> operators to use up its memory. >> >> The thing is page size is dynamically determined -- and in your case >> it >> should be smaller than 4MB. >> >> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >> >> Maybe there is a place that in the maven tests that we explicitly set >> the page size (spark.buffer.pageSize) to 4MB? If yes, we need to >> find it and >> just remove it. >> >> >> On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins >> wrote: >> > >> > I keep hitting errors running the tests on 1.5 such as >> > >> > >> > - join31 *** FAILED *** >> > Failed to execute query using catalyst: >> > Error: Job aborted due to stage failure: Task 9 in stage 3653.0 >> > failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 >> (TID >> > 123363, localhost): java.io.IOException: Unable to acquire 4194304 >> bytes of >> > memory >> > at >> > >> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) >> > >> > >> > This is using the command >> > build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test >> > >> > >> > I don't see these errors in any of the amplab jenkins builds. Do >> those >> > builds have any configuration/environment that I may be missing? My >> build is >> > running with whatever defaults are in the top level pom.xml, eg >> -Xmx3G. >> > >> > I can make these tests pass by setting >> spark.shuffle.memoryFraction=0.6 >> > in the HiveCompatibilitySuite rather than the default 0.2 value. >> > >> > Trying to analyze what is going on with the test it is related to >> the >> > number of active tasks, which seems to rise to 32, and so the >> > ShuffleMemoryManager allows less memory per task even though most >> of those >> > tasks do not have any memory allocated to them. >> > >> > Has anyone seen issues like this before? >> >> >> >>> >> >> >> > >> >> >> >> -- >> Marcelo >> > >
Re: Predicate push-down bug?
Hi Ravi This does look like a bug.. I have created a JIRA to track it here: https://issues.apache.org/jira/browse/SPARK-10623 Ram On Tue, Sep 15, 2015 at 10:47 AM, Ram Sriharshawrote: > Hi Ravi > > Can you share more details? What Spark version are you running? > > Ram > > On Tue, Sep 15, 2015 at 10:32 AM, Ravi Ravi > wrote: > >> Turning on predicate pushdown for ORC datasources results in a >> NoSuchElementException: >> >> scala> val df = sqlContext.sql("SELECT name FROM people WHERE age < 15") >> df: org.apache.spark.sql.DataFrame = [name: string] >> >> scala> sqlContext.setConf("spark.sql.orc.filterPushdown", "*true*") >> >> scala> df.explain >> == Physical Plan == >> *java.util.NoSuchElementException* >> >> Disabling the pushdown makes things work again: >> >> scala> sqlContext.setConf("spark.sql.orc.filterPushdown", "*false*") >> >> scala> df.explain >> == Physical Plan == >> Project [name#6] >> Filter (age#7 < 15) >> Scan >> OrcRelation[file:/home/mydir/spark-1.5.0-SNAPSHOT/test/people][name#6,age#7] >> >> Have any of you run into this problem before? Is a fix available? >> >> Thanks, >> Ravi >> >> >
Re: Unable to acquire memory errors in HiveCompatibilitySuite
That test explicitly sets the number of executor cores to 32. object TestHive extends TestHiveContext( new SparkContext( System.getProperty("spark.sql.test.master", "local[32]"), On Mon, Sep 14, 2015 at 11:22 PM, Reynold Xinwrote: > Yea I think this is where the heuristics is failing -- it uses 8 cores to > approximate the number of active tasks, but the tests somehow is using 32 > (maybe because it explicitly sets it to that, or you set it yourself? I'm > not sure which one) > > On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins wrote: >> >> Reynold, thanks for replying. >> >> getPageSize parameters: maxMemory=515396075, numCores=0 >> Calculated values: cores=8, default=4194304 >> >> So am I getting a large page size as I only have 8 cores? >> >> On 15 September 2015 at 00:40, Reynold Xin wrote: >>> >>> Pete - can you do me a favor? >>> >>> >>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 >>> >>> Print the parameters that are passed into the getPageSize function, and >>> check their values. >>> >>> On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin wrote: Is this on latest master / branch-1.5? out of the box we reserve only 16% (0.2 * 0.8) of the memory for execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, that's 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at least one page for execution. If your page size is 4MB, it only takes 3 operators to use up its memory. The thing is page size is dynamically determined -- and in your case it should be smaller than 4MB. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 Maybe there is a place that in the maven tests that we explicitly set the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it and just remove it. On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins wrote: > > I keep hitting errors running the tests on 1.5 such as > > > - join31 *** FAILED *** > Failed to execute query using catalyst: > Error: Job aborted due to stage failure: Task 9 in stage 3653.0 > failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID > 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes > of > memory > at > org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) > > > This is using the command > build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test > > > I don't see these errors in any of the amplab jenkins builds. Do those > builds have any configuration/environment that I may be missing? My build > is > running with whatever defaults are in the top level pom.xml, eg -Xmx3G. > > I can make these tests pass by setting spark.shuffle.memoryFraction=0.6 > in the HiveCompatibilitySuite rather than the default 0.2 value. > > Trying to analyze what is going on with the test it is related to the > number of active tasks, which seems to rise to 32, and so the > ShuffleMemoryManager allows less memory per task even though most of those > tasks do not have any memory allocated to them. > > Has anyone seen issues like this before? >>> >> > -- Marcelo - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: Unable to acquire memory errors in HiveCompatibilitySuite
This is the culprit: https://issues.apache.org/jira/browse/SPARK-8406 "2. Make `TestHive` use a local mode `SparkContext` with 32 threads to increase parallelism The major reason for this is that, the original parallelism of 2 is too low to reproduce the data loss issue. Also, higher concurrency may potentially caught more concurrency bugs during testing phase. (It did help us spotted SPARK-8501.)" Specific change: http://git-wip-us.apache.org/repos/asf/spark/blob/0818fdec/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index f901bd8..ea325cc 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -49,7 +49,7 @@ import scala.collection.JavaConversions._ object TestHive extends TestHiveContext( new SparkContext( - System.getProperty("spark.sql.test.master", "local[2]"), + System.getProperty("spark.sql.test.master", "local[32]"), "TestSQLContext", new SparkConf() .set("spark.sql.test", "") Setting that to local[8] to match my cores the HiveCompatibilitySuite passes (and runs so much faster!) so maybe that should be changed to limit threads to num cores? Cheers, On 15 September 2015 at 08:50, Pete Robbinswrote: > Ok so it looks like the max number of active tasks reaches 30. I'm not > setting anything as it is a clean environment with clean spark code > checkout. I'll dig further to see why so many tasks are active. > > Cheers, > > On 15 September 2015 at 07:22, Reynold Xin wrote: > >> Yea I think this is where the heuristics is failing -- it uses 8 cores to >> approximate the number of active tasks, but the tests somehow is using 32 >> (maybe because it explicitly sets it to that, or you set it yourself? I'm >> not sure which one) >> >> On Mon, Sep 14, 2015 at 11:06 PM, Pete Robbins >> wrote: >> >>> Reynold, thanks for replying. >>> >>> getPageSize parameters: maxMemory=515396075, numCores=0 >>> Calculated values: cores=8, default=4194304 >>> >>> So am I getting a large page size as I only have 8 cores? >>> >>> On 15 September 2015 at 00:40, Reynold Xin wrote: >>> Pete - can you do me a favor? https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 Print the parameters that are passed into the getPageSize function, and check their values. On Mon, Sep 14, 2015 at 4:32 PM, Reynold Xin wrote: > Is this on latest master / branch-1.5? > > out of the box we reserve only 16% (0.2 * 0.8) of the memory for > execution (e.g. aggregate, join) / shuffle sorting. With a 3GB heap, > that's > 480MB. So each task gets 480MB / 32 = 15MB, and each operator reserves at > least one page for execution. If your page size is 4MB, it only takes 3 > operators to use up its memory. > > The thing is page size is dynamically determined -- and in your case > it should be smaller than 4MB. > https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala#L174 > > Maybe there is a place that in the maven tests that we explicitly set > the page size (spark.buffer.pageSize) to 4MB? If yes, we need to find it > and just remove it. > > > On Mon, Sep 14, 2015 at 4:16 AM, Pete Robbins > wrote: > >> I keep hitting errors running the tests on 1.5 such as >> >> >> - join31 *** FAILED *** >> Failed to execute query using catalyst: >> Error: Job aborted due to stage failure: Task 9 in stage 3653.0 >> failed 1 times, most recent failure: Lost task 9.0 in stage 3653.0 (TID >> 123363, localhost): java.io.IOException: Unable to acquire 4194304 bytes >> of >> memory >> at >> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPage(UnsafeExternalSorter.java:368) >> >> >> This is using the command >> build/mvn -Pyarn -Phadoop-2.2 -Phive -Phive-thriftserver test >> >> >> I don't see these errors in any of the amplab jenkins builds. Do >> those builds have any configuration/environment that I may be missing? My >> build is running with whatever defaults are in the top level pom.xml, eg >> -Xmx3G. >> >> I can make these tests pass by setting >> spark.shuffle.memoryFraction=0.6 in the HiveCompatibilitySuite rather >> than >> the default 0.2 value. >> >> Trying to analyze what is going on with the test it is related to the
RE: And.eval short circuiting
I see. We're having problems with code like this (forgive my noob scala): val df = Seq(("moose","ice"), (null,"fire")).toDF("animals", "elements") df .filter($"animals".rlike(".*")) .filter(callUDF({(value: String) => value.length > 2}, BooleanType, $"animals")) .collect() This code throws a NPE because: * Catalyst combines the filters with an AND * the first filter passes returns null on the first input * the second filter tries to read the length of that null This feels weird. Reading that code, I wouldn't expect null to be passed to the second filter. Even weirder is that if you call collect() after the first filter you won't see nulls, and if you write the data to disk and reread it, the NPE won't happen. It's bewildering! Is this the intended behavior? From: Reynold Xin [r...@databricks.com] Sent: Monday, September 14, 2015 10:14 PM To: Zack Sampson Cc: dev@spark.apache.org Subject: Re: And.eval short circuiting rxin=# select null and true; ?column? -- (1 row) rxin=# select null and false; ?column? -- f (1 row) null and false should return false. On Mon, Sep 14, 2015 at 9:12 PM, Zack Sampson> wrote: It seems like And.eval can avoid calculating right.eval if left.eval returns null. Is there a reason it's written like it is? override def eval(input: Row): Any = { val l = left.eval(input) if (l == false) { false } else { val r = right.eval(input) if (r == false) { false } else { if (l != null && r != null) { true } else { null } } } }
pyspark streaming DStream compute
Can anybody help understand why pyspark streaming uses py4j callback to execute python code while pyspark batch uses worker.py? regarding pyspark streaming, is py4j callback only used for DStream, worker.py still used for RDD? thanks, Renyi.
Fwd: Null Value in DecimalType column of DataFrame
Hi Yin, posted here because I think it's a bug. So, it will return null and I can get a nullpointerexception, as I was getting. Is this really the expected behavior? Never seen something returning null in other Scala tools that I used. Regards, 2015-09-14 18:54 GMT-03:00 Yin Huai: > btw, move it to user list. > > On Mon, Sep 14, 2015 at 2:54 PM, Yin Huai wrote: > >> A scale of 10 means that there are 10 digits at the right of the decimal >> point. If you also have precision 10, the range of your data will be [0, 1) >> and casting "10.5" to DecimalType(10, 10) will return null, which is >> expected. >> >> On Mon, Sep 14, 2015 at 1:42 PM, Dirceu Semighini Filho < >> dirceu.semigh...@gmail.com> wrote: >> >>> Hi all, >>> I'm moving from spark 1.4 to 1.5, and one of my tests is failing. >>> It seems that there was some changes in org.apache.spark.sql.types. >>> DecimalType >>> >>> This ugly code is a little sample to reproduce the error, don't use it >>> into your project. >>> >>> test("spark test") { >>> val file = >>> context.sparkContext().textFile(s"${defaultFilePath}Test.csv").map(f => >>> Row.fromSeq({ >>> val values = f.split(",") >>> >>> Seq(values.head.toString.toInt,values.tail.head.toString.toInt,BigDecimal(values.tail.tail.head), >>> values.tail.tail.tail.head)})) >>> >>> val structType = StructType(Seq(StructField("id", IntegerType, false), >>> StructField("int2", IntegerType, false), StructField("double", >>> >>> DecimalType(10,10), false), >>> >>> >>> StructField("str2", StringType, false))) >>> >>> val df = context.sqlContext.createDataFrame(file,structType) >>> df.first >>> } >>> >>> The content of the file is: >>> >>> 1,5,10.5,va >>> 2,1,0.1,vb >>> 3,8,10.0,vc >>> >>> The problem resides in DecimalType, before 1.5 the scala wasn't >>> required. Now when using DecimalType(12,10) it works fine, but using >>> DecimalType(10,10) the Decimal values >>> 10.5 became null, and the 0.1 works. >>> >>> Is there anybody working with DecimalType for 1.5.1? >>> >>> Regards, >>> Dirceu >>> >>> >> >