Got it. Thanks, that clarifies.
On Thu, Nov 7, 2013 at 3:34 PM, Shangyu Luo <[email protected]> wrote: > I am not sure. But in their RDD paper they have mentioned the usage of > broadcast variable. Sometimes you may need local variable in many > map-reduce jobs and you do not want to copy them to all worker nodes > multiple times. Then the broadcast variable is a good choice > > > 2013/11/7 Walrus theCat <[email protected]> > >> Shangyu, >> >> Thanks for the tip re: the flag! Maybe the broadcast variable is only >> for "complex" data structures? >> >> >> On Sun, Nov 3, 2013 at 7:58 PM, Shangyu Luo <[email protected]> wrote: >> >>> I met the problem of 'Too many open files' before. One solution is >>> adding 'ulimit -n 100000' in the spark-env.sh file. >>> Basically, I think the local variable may not be a problem as I have >>> written programs with local variables as parameters for functions and the >>> programs work. >>> >>> >>> 2013/11/3 Walrus theCat <[email protected]> >>> >>>> Hi Shangyu, >>>> >>>> I appreciate your ongoing correspondence. To clarify, my solution >>>> didn't work, and I didn't expect it to. I was digging through the logs, and >>>> I found a series of exceptions (in only one of the workers): >>>> >>>> 13/11/03 17:51:05 INFO client.DefaultHttpClient: Retrying connect >>>> 13/11/03 17:51:05 INFO http.AmazonHttpClient: Unable to execute HTTP >>>> request: Too many open files >>>> java.net.SocketException: Too many open files >>>> ... >>>> at >>>> com.amazonaws.services.s3.AmazonS3Client.getObject(AmazonS3Client.java:808) >>>> ... >>>> >>>> I don't know why, because I do close those streams, but I'll look into it. >>>> >>>> >>>> >>>> >>>> As an aside, I make references to a spark.util.Vector from a parallelized >>>> context (in an RDD.map operation), as per the Logistic Regression example >>>> that Spark came with, and it seems to work out (the following from the >>>> examples, you'll see that 'w' is not a broadcast variable, and 'points' is >>>> an RDD): >>>> >>>> >>>> >>>> >>>> var w = Vector(D, _ => 2 * rand.nextDouble - 1) >>>> println("Initial w: " + w) >>>> >>>> for (i <- 1 to ITERATIONS) { >>>> println("On iteration " + i) >>>> val gradient = points.map { p => >>>> >>>> >>>> >>>> >>>> (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x >>>> }.reduce(_ + _) >>>> w -= gradient >>>> } >>>> >>>> >>>> >>>> >>>> On Sun, Nov 3, 2013 at 10:47 AM, Shangyu Luo <[email protected]> wrote: >>>> >>>>> Hi Walrus, >>>>> Thank you for sharing your solution to your problem. I think I have >>>>> met the similar problem before (i.e., one machine is working while others >>>>> are idle.) and I just waits for a long time and the program will continue >>>>> processing. I am not sure how your program filters an RDD by a locally >>>>> stored set. If the set is a parameter of a function, I assume it should be >>>>> copied to all worker nodes. But it is good that you solved your problem >>>>> with a broadcast variable and the running time seems reasonable! >>>>> >>>>> >>>>> 2013/11/3 Walrus theCat <[email protected]> >>>>> >>>>>> Hi Shangyu, >>>>>> >>>>>> Thanks for responding. This is a refactor of other code that isn't >>>>>> completely scalable because it pulls stuff to the driver. This code >>>>>> keeps >>>>>> everything on the cluster. I left it running for 7 hours, and the log >>>>>> just >>>>>> froze. I checked ganglia, and only one machine's CPU seemed to be doing >>>>>> anything. The last output on the log left my code at a spot where it is >>>>>> filtering an RDD by a locally stored set. No error was thrown. I >>>>>> thought >>>>>> that was OK based on the example code, but just in case, I changed it so >>>>>> it's a broadcast variable. The un-refactored code (that pulls all the >>>>>> data >>>>>> to the driver from time to time) runs in minutes. I've never had the >>>>>> problem before of the log just getting non-responsive, and was wondering >>>>>> if >>>>>> anyone knew of any heuristics I could check. >>>>>> >>>>>> Thank you >>>>>> >>>>>> >>>>>> On Sat, Nov 2, 2013 at 2:55 PM, Shangyu Luo <[email protected]> wrote: >>>>>> >>>>>>> Yes, I think so. The running time depends on what work your are >>>>>>> doing and how large it is. >>>>>>> >>>>>>> >>>>>>> 2013/11/1 Walrus theCat <[email protected]> >>>>>>> >>>>>>>> That's what I thought, too. So is it not "hanging", just >>>>>>>> recalculating for a very long time? The log stops updating and it just >>>>>>>> gives the output I posted. If there are any suggestions as to >>>>>>>> parameters >>>>>>>> to change, or any other data, it would be appreciated. >>>>>>>> >>>>>>>> Thank you, Shangyu. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Nov 1, 2013 at 11:31 AM, Shangyu Luo <[email protected]>wrote: >>>>>>>> >>>>>>>>> I think the missing parent may be not abnormal. From my >>>>>>>>> understanding, when a Spark task cannot find its parent, it can use >>>>>>>>> some >>>>>>>>> meta data to find the result of its parent or recalculate its parent's >>>>>>>>> value. Imaging in a loop, a Spark task tries to find some value from >>>>>>>>> the >>>>>>>>> last iteration's result. >>>>>>>>> >>>>>>>>> >>>>>>>>> 2013/11/1 Walrus theCat <[email protected]> >>>>>>>>> >>>>>>>>>> Are there heuristics to check when the scheduler says it is >>>>>>>>>> "missing parents" and just hangs? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Oct 31, 2013 at 4:56 PM, Walrus theCat < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I'm not sure what's going on here. My code seems to be working >>>>>>>>>>> thus far (map at SparkLR:90 completed.) What can I do to help the >>>>>>>>>>> scheduler out here? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> 13/10/31 02:10:13 INFO scheduler.DAGScheduler: Completed >>>>>>>>>>> ShuffleMapTask(10, 211) >>>>>>>>>>> 13/10/31 02:10:13 INFO scheduler.DAGScheduler: Stage 10 (map at >>>>>>>>>>> SparkLR.scala:90) finished in 0.923 s >>>>>>>>>>> 13/10/31 02:10:13 INFO scheduler.DAGScheduler: looking for newly >>>>>>>>>>> runnable stages >>>>>>>>>>> 13/10/31 02:10:13 INFO scheduler.DAGScheduler: running: >>>>>>>>>>> Set(Stage 11) >>>>>>>>>>> 13/10/31 02:10:13 INFO scheduler.DAGScheduler: waiting: >>>>>>>>>>> Set(Stage 9, Stage 8) >>>>>>>>>>> 13/10/31 02:10:13 INFO scheduler.DAGScheduler: failed: Set() >>>>>>>>>>> 13/10/31 02:10:16 INFO scheduler.DAGScheduler: Missing parents >>>>>>>>>>> for Stage 9: List(Stage 11) >>>>>>>>>>> 13/10/31 02:10:16 INFO scheduler.DAGScheduler: Missing parents >>>>>>>>>>> for Stage 8: List(Stage 9) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- >>>>>>>>> >>>>>>>>> Shangyu, Luo >>>>>>>>> Department of Computer Science >>>>>>>>> Rice University >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Not Just Think About It, But Do It! >>>>>>>>> -- >>>>>>>>> Success is never final. >>>>>>>>> -- >>>>>>>>> Losers always whine about their best >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- >>>>>>> >>>>>>> Shangyu, Luo >>>>>>> Department of Computer Science >>>>>>> Rice University >>>>>>> >>>>>>> -- >>>>>>> Not Just Think About It, But Do It! >>>>>>> -- >>>>>>> Success is never final. >>>>>>> -- >>>>>>> Losers always whine about their best >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> -- >>>>> >>>>> Shangyu, Luo >>>>> Department of Computer Science >>>>> Rice University >>>>> >>>>> -- >>>>> Not Just Think About It, But Do It! >>>>> -- >>>>> Success is never final. >>>>> -- >>>>> Losers always whine about their best >>>>> >>>> >>>> >>> >>> >>> -- >>> -- >>> >>> Shangyu, Luo >>> Department of Computer Science >>> Rice University >>> >>> -- >>> Not Just Think About It, But Do It! >>> -- >>> Success is never final. >>> -- >>> Losers always whine about their best >>> >> >> > > > -- > -- > > Shangyu, Luo > Department of Computer Science > Rice University > > -- > Not Just Think About It, But Do It! > -- > Success is never final. > -- > Losers always whine about their best >
