There isn't a cycle in your graph, since although you reuse reference variables in your code called A and B you are in fact creating new RDDs at each operation. You have some other problem, and you'd have to provide detail on why you think something is deadlocked, like a thread dump.
On Mon, Sep 14, 2015 at 10:42 AM, petranidis <pnyfan...@gmail.com> wrote: > Hi all, > > I am new to spark and I have writen a few spark programs mostly around > machine learning > applications. > > I am trying to resolve a particular problem where there are two RDDs that > should be updated > by using elements of each other. More specifically, if the two pair RDDs are > called A and B M > is a matrix that specifies which elements of each RDD should be taken into > account when > computing the other with rows of M corresponding to elements of A and > columns to elements > of B e.g. > > A = (0, 6), (1,7), (2,8) > B = (0, 4), (1,6), (2,1) > and > M = > 0 1 1 > 1 1 0 > 0 1 0 > > Then > > for (it =0;it < 10; it++) { > A(0) = B(1) + B(2) > A(1) = B(0) + B(1) > A(2) = B(1) > B(0) = A(1) > B(1) = A(0) + A(1) + A(2) > B(2) = A(0) > } > > To do such a computation in spark, I used > A = A.map( (key,val) => { B.aggregate(...) }) > B = B.map( (key,val) => { A.aggregate(...) }) > > where if the key of each mapped element keyA is passed in the aggregate > function as a > initialization parameter and then for each B element key keyB, if M(keyA, > keyB) ==1 > then the B element is being taken into account in the summation. > > The calculation of A is done successfully and correctly, but then the DAG > scheduler > seems to deadlock when the calculation of B happens. This behaviour goes > away > when I remove the A.aggregate bit in my code. Apparently according to the > logs the > scheduler is expecting some results before if can go on but the results > should already > have been calculated. > > I assume that this has to do with the DAG scheduling not handling cyclical > dependencies. > Is there a way I can force each iteration or update of A and B to be seen as > a separate > stage? Otherwise, how can I implement this type of aggregation in another > way? (It could > be the equivalent of mapping the A elements to a List of all the B elements > for which the M > matrix entry is 1 and then mapping again to their sum, but this means I need > a lot of space > especially when the problem in hand could be very large, which is > unfeasible, so I need to avoid this) > > Thanks in advance for your help! > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/DAG-Scheduler-deadlock-when-two-RDDs-reference-each-other-force-Stages-manually-tp24684.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org