The feature dimension is small. You don't need a big akka.frameSize. The default one (10M) should be sufficient. Did you cache the data before calling LRWithSGD? -Xiangrui
On Thu, Jul 3, 2014 at 10:02 AM, Bharath Ravi Kumar <reachb...@gmail.com> wrote: > I tried another run after setting the driver memory to 8G (and > spark.akka.frameSize = 500 on the executors and the driver). In addition, I > also tried to reduce the amount of data that a single task processes, by > increasing the number of partitions (of the labeled points) to 120 (instead > of 2 used earlier), and then setting max cores to 2. That made no difference > since, at the end of 120 tasks, the familiar error message appeared on a > slave: > > <snipped earlier logs> > 14/07/03 16:18:48 INFO CoarseGrainedExecutorBackend: Got assigned task 1436 > 14/07/03 16:18:48 INFO Executor: Running task ID 1436 > 14/07/03 16:18:53 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00014:0+2215337 > 14/07/03 16:18:54 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00014:2215337+2215338 > 14/07/03 16:18:54 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00003:0+2196429 > 14/07/03 16:18:54 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00003:2196429+2196430 > 14/07/03 16:18:54 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00010:0+2186751 > 14/07/03 16:18:54 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00010:2186751+2186751 > 14/07/03 16:18:54 INFO Executor: Serialized size of result for 1436 is > 5958822 > 14/07/03 16:18:54 INFO Executor: Sending result for 1436 directly to driver > 14/07/03 16:18:54 INFO Executor: Finished task ID 1436 > 14/07/03 16:18:54 INFO CoarseGrainedExecutorBackend: Got assigned task 1438 > 14/07/03 16:18:54 INFO Executor: Running task ID 1438 > 14/07/03 16:19:00 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00004:0+2209615 > 14/07/03 16:19:00 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00004:2209615+2209616 > 14/07/03 16:19:00 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00011:0+2202240 > 14/07/03 16:19:00 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00011:2202240+2202240 > 14/07/03 16:19:00 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00009:0+2194423 > 14/07/03 16:19:00 INFO HadoopRDD: Input split: > file:~//2014-05-24-02/part-r-00009:2194423+2194424 > 14/07/03 16:19:00 INFO Executor: Serialized size of result for 1438 is > 5958822 > 14/07/03 16:19:00 INFO Executor: Sending result for 1438 directly to driver > 14/07/03 16:19:00 INFO Executor: Finished task ID 1438 > 14/07/03 16:19:14 ERROR CoarseGrainedExecutorBackend: Driver Disassociated > [akka.tcp://sparkExecutor@slave1:51099] -> [akka.tcp://spark@master:58272] > disassociated! Shutting down. > > > The corresponding master logs were: > > 4/07/03 16:02:14 INFO Master: Registering app LogRegExp > 14/07/03 16:02:14 INFO Master: Registered app LogRegExp with ID > app-20140703160214-0028 > 14/07/03 16:02:14 INFO Master: Launching executor app-20140703160214-0028/1 > on worker worker-20140630124441-slave1-40182 > 14/07/03 16:19:15 INFO Master: Removing executor app-20140703160214-0028/1 > because it is EXITED > 14/07/03 16:19:15 INFO Master: Launching executor app-20140703160214-0028/2 > on worker worker-20140630124441-slave1-40182 > 14/07/03 16:19:15 INFO Master: Removing executor app-20140703160214-0028/0 > because it is EXITED > 14/07/03 16:19:15 INFO Master: Launching executor app-20140703160214-0028/3 > on worker worker-20140630102913-slave2-44735 > 14/07/03 16:19:18 INFO Master: Removing executor app-20140703160214-0028/2 > because it is EXITED > 14/07/03 16:19:18 INFO Master: Launching executor app-20140703160214-0028/4 > on worker worker-20140630124441-slave1-40182 > 14/07/03 16:19:18 INFO Master: Removing executor app-20140703160214-0028/3 > because it is EXITED > 14/07/03 16:19:18 INFO Master: Launching executor app-20140703160214-0028/5 > on worker worker-20140630102913-slave2-44735 > 14/07/03 16:19:20 INFO Master: akka.tcp://spark@master:58272 got > disassociated, removing it. > 14/07/03 16:19:20 INFO Master: Removing app app-20140703160214-0028 > 14/07/03 16:19:20 INFO Master: akka.tcp://spark@master:58272 got > disassociated, removing it. > > > Throughout the execution, I confirmed in the UI that driver memory used was > 0.0 B / 6.9 GB and each executor's memory showed 0.0 B / 12.1 GB even when > aggregate was being executed. On a related note, I noticed in the executors > tab that just before the entire job terminated, executors on slave1, slave2 > and the driver "disappeared" momentarily from the active executors list. The > replacement executors on slave1 and slave2 were re-spawned a couple of > times and appeared on the executors list again before they too died and the > job failed. > So it appears that no matter what the task input-result size, the execution > fails at the end of the stage corresponding to GradientDescent.aggregate > (and the preceding count() in GradientDescent goes through fine). Let me > know if you need any additional information. > > > On Thu, Jul 3, 2014 at 12:27 PM, Xiangrui Meng <men...@gmail.com> wrote: >> >> Could you check the driver memory in the executor tab of the Spark UI >> when the job is running? If it is too small, please set >> --driver-memory with spark-submit, e.g. 10g. Could you also attach the >> master log under spark/logs as well? -Xiangrui >> >> On Wed, Jul 2, 2014 at 9:34 AM, Bharath Ravi Kumar <reachb...@gmail.com> >> wrote: >> > Hi Xiangrui, >> > >> > The issue with aggergating/counting over large feature vectors (as part >> > of >> > LogisticRegressionWithSGD) continues to exist, but now in another form: >> > while the execution doesn't freeze (due to SPARK-1112), it now fails at >> > the >> > second or third gradient descent iteration consistently with an error >> > level >> > log message, but no stacktrace. I'm running against 1.0.1-rc1, and have >> > tried setting spark.akka.frameSize as high as 500. When the execution >> > fails, >> > each of the two executors log the following message (corresponding to >> > aggregate at GradientDescent.scala:178) : >> > >> > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: >> > maxBytesInFlight: 50331648, targetRequestSize: 10066329 >> > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: >> > Getting 2 non-empty blocks out of 2 blocks >> > 14/07/02 14:09:09 INFO BlockFetcherIterator$BasicBlockFetcherIterator: >> > Started 1 remote fetches in 0 ms >> > 14/07/02 14:09:11 INFO Executor: Serialized size of result for 737 is >> > 5959086 >> > 14/07/02 14:09:11 INFO Executor: Sending result for 737 directly to >> > driver >> > 14/07/02 14:09:11 INFO Executor: Finished task ID 737 >> > 14/07/02 14:09:18 ERROR CoarseGrainedExecutorBackend: Driver >> > Disassociated >> > [akka.tcp://sparkExecutor@(slave1,slave2):51941] -> >> > [akka.tcp://spark@master:59487] disassociated! Shutting down. >> > >> > >> > There is no separate stacktrace on the driver side. >> > >> > Each input record is of the form p1, p2, (p1,p2) where p1, p2 & (p1,p2) >> > are >> > categorical features with large cardinality, and X is the double label >> > with >> > a continuous value. The categorical variables are converted to binary >> > variables which results in a feature vector of size 741092 (composed of >> > all >> > unique categories across p1, p2 and (p1,p2)). Thus, the labeled point >> > for >> > input record is a sparse vector of size 741092 with only 3 variables set >> > in >> > the record. The total number of records is 683233 after aggregating the >> > input data on (p1, p2). When attempting to train on the unaggregated >> > records >> > (1337907 in number spread across 455 files), the execution fails at >> > count, >> > GradientDescent.scala:161 with the following log >> > >> > >> > (Snipped lines corresponding to other input files) >> > 14/07/02 16:02:03 INFO HadoopRDD: Input split: >> > file:~/part-r-00012:2834590+2834590 >> > 14/07/02 16:02:03 INFO HadoopRDD: Input split: >> > file:~/part-r-00005:0+2845559 >> > 14/07/02 16:02:03 INFO HadoopRDD: Input split: >> > file:~/part-r-00005:2845559+2845560 >> > 14/07/02 16:02:03 INFO Executor: Serialized size of result for 726 is >> > 615 >> > 14/07/02 16:02:03 INFO Executor: Sending result for 726 directly to >> > driver >> > 14/07/02 16:02:03 INFO Executor: Finished task ID 726 >> > 14/07/02 16:02:12 ERROR CoarseGrainedExecutorBackend: Driver >> > Disassociated >> > [akka.tcp://sparkExecutor@slave1:48423] -> >> > [akka.tcp://spark@master:55792] >> > disassociated! Shutting down. >> > >> > A count() attempted on the input RDD before beginning training has the >> > following metrics: >> > >> > >> > Metric Min 25th Median 75th Max >> > >> > Result >> > serialization >> > time 0 ms 0 ms 0 ms 0 ms 0 ms >> > >> > Duration 33 s 33 s 35 s 35 s 35 s >> > >> > Time spent >> > fetching task >> > results 0 ms 0 ms 0 ms 0 ms 0 ms >> > >> > Scheduler >> > delay 0.1 s 0.1 s 0.3 s 0.3 s 0.3 s >> > >> > Aggregated Metrics by Executor >> > >> > ID Address Task Time Total Failed Succeeded Shuffle Read >> > Shuffle Write Shuf Spill (Mem) Shuf Spill (Disk) >> > 0 CANNOT FIND ADDRESS 34 s 1 0 1 0.0 B >> > 0.0 B 0.0 B 0.0 B >> > 1 CANNOT FIND ADDRESS 36 s 1 0 1 0.0 B >> > 0.0 B 0.0 B 0.0 B >> > >> > Tasks >> > >> > Task Index Task ID Status Locality Level Executor Launch >> > Time >> > Duration GC Time Result Ser Time Errors >> > 0 726 SUCCESS PROCESS_LOCAL slave1 >> > 2014/07/02 >> > 16:01:28 35 s 0.1 s >> > 1 727 SUCCESS PROCESS_LOCAL slave2 >> > 2014/07/02 >> > 16:01:28 33 s 99 ms >> > >> > Any pointers / diagnosis please? >> > >> > >> > >> > >> > On Thu, Jun 19, 2014 at 10:03 AM, Bharath Ravi Kumar >> > <reachb...@gmail.com> >> > wrote: >> >> >> >> Thanks. I'll await the fix to re-run my test. >> >> >> >> >> >> On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng <men...@gmail.com> >> >> wrote: >> >>> >> >>> Hi Bharath, >> >>> >> >>> This is related to SPARK-1112, which we already found the root cause. >> >>> I will let you know when this is fixed. >> >>> >> >>> Best, >> >>> Xiangrui >> >>> >> >>> On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar >> >>> <reachb...@gmail.com> >> >>> wrote: >> >>> > Couple more points: >> >>> > 1)The inexplicable stalling of execution with large feature sets >> >>> > appears >> >>> > similar to that reported with the news-20 dataset: >> >>> > >> >>> > >> >>> > http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E >> >>> > >> >>> > 2) The NPE trying to call mapToPair convert an RDD<Long, Long, >> >>> > Integer, >> >>> > Integer> into a JavaPairRDD<Tuple2<Long,Long>, >> >>> > Tuple2<Integer,Integer>> >> >>> > is >> >>> > unrelated to mllib. >> >>> > >> >>> > Thanks, >> >>> > Bharath >> >>> > >> >>> > >> >>> > >> >>> > On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar >> >>> > <reachb...@gmail.com> >> >>> > wrote: >> >>> >> >> >>> >> Hi Xiangrui , >> >>> >> >> >>> >> I'm using 1.0.0. >> >>> >> >> >>> >> Thanks, >> >>> >> Bharath >> >>> >> >> >>> >> On 18-Jun-2014 1:43 am, "Xiangrui Meng" <men...@gmail.com> wrote: >> >>> >>> >> >>> >>> Hi Bharath, >> >>> >>> >> >>> >>> Thanks for posting the details! Which Spark version are you using? >> >>> >>> >> >>> >>> Best, >> >>> >>> Xiangrui >> >>> >>> >> >>> >>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar >> >>> >>> <reachb...@gmail.com> >> >>> >>> wrote: >> >>> >>> > Hi, >> >>> >>> > >> >>> >>> > (Apologies for the long mail, but it's necessary to provide >> >>> >>> > sufficient >> >>> >>> > details considering the number of issues faced.) >> >>> >>> > >> >>> >>> > I'm running into issues testing LogisticRegressionWithSGD a two >> >>> >>> > node >> >>> >>> > cluster >> >>> >>> > (each node with 24 cores and 16G available to slaves out of 24G >> >>> >>> > on >> >>> >>> > the >> >>> >>> > system). Here's a description of the application: >> >>> >>> > >> >>> >>> > The model is being trained based on categorical features x, y, >> >>> >>> > and >> >>> >>> > (x,y). >> >>> >>> > The categorical features are mapped to binary features by >> >>> >>> > converting >> >>> >>> > each >> >>> >>> > distinct value in the category enum into a binary feature by >> >>> >>> > itself >> >>> >>> > (i.e >> >>> >>> > presence of that value in a record implies corresponding feature >> >>> >>> > = >> >>> >>> > 1, >> >>> >>> > else >> >>> >>> > feature = 0. So, there'd be as many distinct features as enum >> >>> >>> > values) . >> >>> >>> > The >> >>> >>> > training vector is laid out as >> >>> >>> > [x1,x2...xn,y1,y2....yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record >> >>> >>> > in >> >>> >>> > the >> >>> >>> > training data has only one combination (Xk,Yk) and a label >> >>> >>> > appearing in >> >>> >>> > the >> >>> >>> > record. Thus, the corresponding labeledpoint sparse vector would >> >>> >>> > only >> >>> >>> > have 3 >> >>> >>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the >> >>> >>> > vector >> >>> >>> > (though parse) would be nearly 614000. The number of records is >> >>> >>> > about >> >>> >>> > 1.33 >> >>> >>> > million. The records have been coalesced into 20 partitions >> >>> >>> > across >> >>> >>> > two >> >>> >>> > nodes. The input data has not been cached. >> >>> >>> > (NOTE: I do realize the records & features may seem large for a >> >>> >>> > two >> >>> >>> > node >> >>> >>> > setup, but given the memory & cpu, and the fact that I'm willing >> >>> >>> > to >> >>> >>> > give up >> >>> >>> > some turnaround time, I don't see why tasks should inexplicably >> >>> >>> > fail) >> >>> >>> > >> >>> >>> > Additional parameters include: >> >>> >>> > >> >>> >>> > spark.executor.memory = 14G >> >>> >>> > spark.default.parallelism = 1 >> >>> >>> > spark.cores.max=20 >> >>> >>> > spark.storage.memoryFraction=0.8 //No cache space required >> >>> >>> > (Trying to set spark.akka.frameSize to a larger number, say, 20 >> >>> >>> > didn't >> >>> >>> > help >> >>> >>> > either) >> >>> >>> > >> >>> >>> > The model training was initialized as : new >> >>> >>> > LogisticRegressionWithSGD(1, >> >>> >>> > maxIterations, 0.0, 0.05) >> >>> >>> > >> >>> >>> > However, after 4 iterations of gradient descent, the entire >> >>> >>> > execution >> >>> >>> > appeared to stall inexplicably. The corresponding executor >> >>> >>> > details >> >>> >>> > and >> >>> >>> > details of the stalled stage (number 14) are as follows: >> >>> >>> > >> >>> >>> > Metric Min 25th Median 75th >> >>> >>> > Max >> >>> >>> > Result serialization time 12 ms 13 ms 14 ms 16 ms >> >>> >>> > 18 >> >>> >>> > ms >> >>> >>> > Duration 4 s 4 s 5 s 5 s >> >>> >>> > 5 s >> >>> >>> > Time spent fetching task 0 ms 0 ms 0 ms 0 ms 0 >> >>> >>> > ms >> >>> >>> > results >> >>> >>> > Scheduler delay 6 s 6 s 6 s >> >>> >>> > 6 s >> >>> >>> > 12 s >> >>> >>> > >> >>> >>> > >> >>> >>> > Stage Id >> >>> >>> > 14 aggregate at GradientDescent.scala:178 >> >>> >>> > >> >>> >>> > Task Index Task ID Status Locality Level Executor >> >>> >>> > Launch Time Duration GC Result Ser Time >> >>> >>> > Errors >> >>> >>> > >> >>> >>> > Time >> >>> >>> > >> >>> >>> > 0 600 RUNNING PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >>> >>> > 1 601 RUNNING PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >>> >>> > 2 602 RUNNING PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >>> >>> > 3 603 RUNNING PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >>> >>> > 4 604 RUNNING PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 1.1 h >> >>> >>> > 5 605 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >> >>> >>> > 6 606 SUCCESS PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >> >>> >>> > 7 607 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 4 s 2 s 12 ms >> >>> >>> > 8 608 SUCCESS PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >>> >>> > 9 609 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 14 ms >> >>> >>> > 10 610 SUCCESS PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >>> >>> > 11 611 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 13 ms >> >>> >>> > 12 612 SUCCESS PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >> >>> >>> > 13 613 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 13 ms >> >>> >>> > 14 614 SUCCESS PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 14 ms >> >>> >>> > 15 615 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 12 ms >> >>> >>> > 16 616 SUCCESS PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 15 ms >> >>> >>> > 17 617 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 18 ms >> >>> >>> > 18 618 SUCCESS PROCESS_LOCAL >> >>> >>> > serious.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 5 s 1 s 16 ms >> >>> >>> > 19 619 SUCCESS PROCESS_LOCAL >> >>> >>> > casual.dataone.foo.bar.com >> >>> >>> > 2014/06/17 10:32:27 4 s 1 s 18 ms >> >>> >>> > >> >>> >>> > Executor stats: >> >>> >>> > >> >>> >>> > RDD Blocks Memory Used Disk Used Active Tasks Failed >> >>> >>> > Tasks >> >>> >>> > Complete Tasks Total Tasks Task Time Shuffle Read >> >>> >>> > Shuffle >> >>> >>> > Write >> >>> >>> > 0 0.0 B / 6.7 GB 0.0 B 2 0 >> >>> >>> > 307 309 23.2 m 0.0 B 0.0 B >> >>> >>> > 0 0.0 B / 6.7 GB 0.0 B 3 0 >> >>> >>> > 308 311 22.4 m 0.0 B 0.0 B >> >>> >>> > >> >>> >>> > >> >>> >>> > Executor jmap output: >> >>> >>> > >> >>> >>> > Server compiler detected. >> >>> >>> > JVM version is 24.55-b03 >> >>> >>> > >> >>> >>> > using thread-local object allocation. >> >>> >>> > Parallel GC with 18 thread(s) >> >>> >>> > >> >>> >>> > Heap Configuration: >> >>> >>> > MinHeapFreeRatio = 40 >> >>> >>> > MaxHeapFreeRatio = 70 >> >>> >>> > MaxHeapSize = 10737418240 (10240.0MB) >> >>> >>> > NewSize = 1310720 (1.25MB) >> >>> >>> > MaxNewSize = 17592186044415 MB >> >>> >>> > OldSize = 5439488 (5.1875MB) >> >>> >>> > NewRatio = 2 >> >>> >>> > SurvivorRatio = 8 >> >>> >>> > PermSize = 21757952 (20.75MB) >> >>> >>> > MaxPermSize = 134217728 (128.0MB) >> >>> >>> > G1HeapRegionSize = 0 (0.0MB) >> >>> >>> > >> >>> >>> > Heap Usage: >> >>> >>> > PS Young Generation >> >>> >>> > Eden Space: >> >>> >>> > capacity = 2783969280 (2655.0MB) >> >>> >>> > used = 192583816 (183.66223907470703MB) >> >>> >>> > free = 2591385464 (2471.337760925293MB) >> >>> >>> > 6.917598458557704% used >> >>> >>> > From Space: >> >>> >>> > capacity = 409993216 (391.0MB) >> >>> >>> > used = 1179808 (1.125152587890625MB) >> >>> >>> > free = 408813408 (389.8748474121094MB) >> >>> >>> > 0.2877628102022059% used >> >>> >>> > To Space: >> >>> >>> > capacity = 385351680 (367.5MB) >> >>> >>> > used = 0 (0.0MB) >> >>> >>> > free = 385351680 (367.5MB) >> >>> >>> > 0.0% used >> >>> >>> > PS Old Generation >> >>> >>> > capacity = 7158628352 (6827.0MB) >> >>> >>> > used = 4455093024 (4248.707794189453MB) >> >>> >>> > free = 2703535328 (2578.292205810547MB) >> >>> >>> > 62.2338918146983% used >> >>> >>> > PS Perm Generation >> >>> >>> > capacity = 90701824 (86.5MB) >> >>> >>> > used = 45348832 (43.248016357421875MB) >> >>> >>> > free = 45352992 (43.251983642578125MB) >> >>> >>> > 49.99770677158598% used >> >>> >>> > >> >>> >>> > 8432 interned Strings occupying 714672 bytes. >> >>> >>> > >> >>> >>> > >> >>> >>> > Executor GC log snippet: >> >>> >>> > >> >>> >>> > 168.778: [GC [PSYoungGen: 2702831K->578545K(2916864K)] >> >>> >>> > 9302453K->7460857K(9907712K), 0.3193550 secs] [Times: user=5.13 >> >>> >>> > sys=0.39, >> >>> >>> > real=0.32 secs] >> >>> >>> > 169.097: [Full GC [PSYoungGen: 578545K->0K(2916864K)] >> >>> >>> > [ParOldGen: >> >>> >>> > 6882312K->1073297K(6990848K)] 7460857K->1073297K(9907712K) >> >>> >>> > [PSPermGen: >> >>> >>> > 44248K->44201K(88576K)], 4.5521090 secs] [Times: user=24.22 >> >>> >>> > sys=0.18, >> >>> >>> > real=4.55 secs] >> >>> >>> > 174.207: [GC [PSYoungGen: 2338304K->81315K(2544128K)] >> >>> >>> > 3411653K->1154665K(9534976K), 0.0966280 secs] [Times: user=1.66 >> >>> >>> > sys=0.00, >> >>> >>> > real=0.09 secs] >> >>> >>> > >> >>> >>> > I tried to map partitions to cores on the nodes. Increasing the >> >>> >>> > number >> >>> >>> > of >> >>> >>> > partitions (say to 80 or 100) would result in progress till the >> >>> >>> > 6th >> >>> >>> > iteration or so, but the next stage would stall as before with >> >>> >>> > apparent >> >>> >>> > root >> >>> >>> > cause / logs. With increased partitions, the last stage that >> >>> >>> > completed >> >>> >>> > had >> >>> >>> > the following task times: >> >>> >>> > >> >>> >>> > Metric Min 25th Median 75th >> >>> >>> > Max >> >>> >>> > Result serialization time 11 ms 12 ms 13 ms 15 ms >> >>> >>> > 0.4 s >> >>> >>> > Duration 0.5 s 0.9 s 1 s 3 s >> >>> >>> > 7 s >> >>> >>> > Time spent fetching 0 ms 0 ms 0 ms 0 ms 0 >> >>> >>> > ms >> >>> >>> > task results >> >>> >>> > Scheduler delay 5 s 6 s 6 s >> >>> >>> > 7 s >> >>> >>> > 12 s >> >>> >>> > >> >>> >>> > My hypothesis is that as the coefficient array becomes less >> >>> >>> > sparse >> >>> >>> > (with >> >>> >>> > successive iterations), the cost of the aggregate goes up to the >> >>> >>> > point >> >>> >>> > that >> >>> >>> > it stalls (which I failed to explain). Reducing the batch >> >>> >>> > fraction >> >>> >>> > to a >> >>> >>> > very >> >>> >>> > low number like 0.01 saw the iterations progress further, but >> >>> >>> > the >> >>> >>> > model >> >>> >>> > failed to converge in that case after a small number of >> >>> >>> > iterations. >> >>> >>> > >> >>> >>> > >> >>> >>> > I also tried reducing the number of records by aggregating on >> >>> >>> > (x,y) >> >>> >>> > as >> >>> >>> > the >> >>> >>> > key (i.e. using aggregations instead of training on every raw >> >>> >>> > record), >> >>> >>> > but >> >>> >>> > encountered by the following exception: >> >>> >>> > >> >>> >>> > Loss was due to java.lang.NullPointerException >> >>> >>> > java.lang.NullPointerException >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:750) >> >>> >>> > at >> >>> >>> > scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >> >>> >>> > at >> >>> >>> > >> >>> >>> > org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:59) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:96) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.rdd.PairRDDFunctions$$anonfun$1.apply(PairRDDFunctions.scala:95) >> >>> >>> > at >> >>> >>> > org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >> >>> >>> > at >> >>> >>> > org.apache.spark.rdd.RDD$$anonfun$14.apply(RDD.scala:582) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> >>> >>> > at >> >>> >>> > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) >> >>> >>> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) >> >>> >>> > at org.apache.spark.scheduler.Task.run(Task.scala:51) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >>> >>> > at >> >>> >>> > >> >>> >>> > >> >>> >>> > >> >>> >>> > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >>> >>> > at java.lang.Thread.run(Thread.java:745) >> >>> >>> > >> >>> >>> > >> >>> >>> > I'd appreciate any insights/comments about what may be causing >> >>> >>> > the >> >>> >>> > execution >> >>> >>> > to stall. >> >>> >>> > >> >>> >>> > If logs/tables appear poorly indented in the email, here's a >> >>> >>> > gist >> >>> >>> > with >> >>> >>> > relevant details: >> >>> >>> > https://gist.github.com/reachbach/a418ab2f01b639b624c1 >> >>> >>> > >> >>> >>> > Thanks, >> >>> >>> > Bharath >> >>> > >> >>> > >> >> >> >> >> > > >