not sure i see the faster algo in the paper you mention. i see this in section 6.1.2: "In what follows we give a simple labeling algorithm that computes connectivity on sparse graphs in O(log N) rounds." N here is the size of the graph, not the largest component diameter.
that is the exact same algo as is implemented in graphx i think. or is it not? On Fri, Nov 11, 2016 at 7:58 PM, Daniel Darabos < daniel.dara...@lynxanalytics.com> wrote: > Hi Shreya, > GraphFrames just calls the GraphX strongly connected components code. ( > https://github.com/graphframes/graphframes/blob/ > release-0.2.0/src/main/scala/org/graphframes/lib/ > StronglyConnectedComponents.scala#L51) > > For choosing the number of iterations: If the number of iterations is less > than the diameter of the graph, you may get an incorrect result. But > running for more iterations than that buys you nothing. The algorithm is > basically to broadcast your ID to all your neighbors in the first round, > and then broadcast the smallest ID that you have seen so far in the next > rounds. So with only 1 round you will get a wrong result unless each vertex > is connected to the vertex with the lowest ID in that component. (Unlikely > in a real graph.) > > See https://github.com/apache/spark/blob/v2.0.2/graphx/src/ > main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala for the > actual implementation. > > A better algorithm exists for this problem that only requires O(log(N)) > iterations when N is the largest component diameter. (It is described in "A > Model of Computation for MapReduce", http://www.sidsuri.com/ > Publications_files/mrc.pdf.) This outperforms GraphX's implementation > immensely. (See the last slide of http://www.slideshare.net/ > SparkSummit/interactive-graph-analytics-daniel-darabos#33.) The large > advantage is due to the lower number of necessary iterations. > > For why this is failing even with one iteration: I would first check your > partitioning. Too many or too few partitions could equally cause the issue. > If you are lucky, there is no overlap between the "too many" and "too few" > domains :). > > On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shrey...@microsoft.com> > wrote: > >> Tried GraphFrames. Still faced the same – job died after a few hours . >> The errors I see (And I see tons of them) are – >> >> (I ran with 3 times the partitions as well, which was 12 times number of >> executors , but still the same.) >> >> >> >> ------------------------------------- >> >> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on >> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress >> Exception details: null Error Code : RequestBodyTooLarge >> >> >> >> ------------------------------------- >> >> >> >> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests >> outstanding when connection from /10.0.0.95:43301 is closed >> >> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 >> outstanding blocks after 5000 ms >> >> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 >> non-empty blocks out of 1500 blocks >> >> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting >> block fetches >> >> java.io.IOException: Connection from /10.0.0.95:43301 closed >> >> >> >> ------------------------------------- >> >> >> >> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting >> block fetches >> >> java.lang.RuntimeException: java.io.FileNotFoundException: >> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcac >> he/application_1478717432179_0021/blockmgr-b1dde30d-359e- >> 4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index (No such file or >> directory) >> >> >> >> ------------------------------------- >> >> >> >> org.apache.spark.SparkException: Exception thrown in awaitResult >> >> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse( >> RpcTimeout.scala:77) >> >> at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse( >> RpcTimeout.scala:75) >> >> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >> unction.scala:36) >> >> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >> $1.applyOrElse(RpcTimeout.scala:59) >> >> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >> $1.applyOrElse(RpcTimeout.scala:59) >> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >> >> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout. >> scala:83) >> >> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint >> Ref.scala:102) >> >> at org.apache.spark.executor.Executor.org$apache$spark$executor >> $Executor$$reportHeartBeat(Executor.scala:518) >> >> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1. >> apply$mcV$sp(Executor.scala:547) >> >> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1. >> apply(Executor.scala:547) >> >> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1. >> apply(Executor.scala:547) >> >> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca >> la:1857) >> >> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal >> a:547) >> >> at java.util.concurrent.Executors$RunnableAdapter.call( >> Executors.java:511) >> >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java: >> 308) >> >> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >> tureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> >> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >> tureTask.run(ScheduledThreadPoolExecutor.java:294) >> >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.util.ConcurrentModificationException >> >> at java.util.ArrayList.writeObject(ArrayList.java:766) >> >> at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) >> >> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> >> at java.lang.reflect.Method.invoke(Method.java:498) >> >> at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClas >> s.java:1028) >> >> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStrea >> m.java:1496) >> >> at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputS >> tream.java:1432) >> >> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream. >> java:1178) >> >> >> >> ------------------------------------- >> >> >> >> 16/11/11 13:21:54 WARN Executor: Issue communicating with driver in >> heartbeater >> >> org.apache.spark.SparkException: Error sending message [message = >> Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103, >> 36162))] >> >> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint >> Ref.scala:119) >> >> at org.apache.spark.executor.Executor.org$apache$spark$executor >> $Executor$$reportHeartBeat(Executor.scala:518) >> >> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1. >> apply$mcV$sp(Executor.scala:547) >> >> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1. >> apply(Executor.scala:547) >> >> at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1. >> apply(Executor.scala:547) >> >> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.sca >> la:1857) >> >> at org.apache.spark.executor.Executor$$anon$1.run(Executor.scal >> a:547) >> >> at java.util.concurrent.Executors$RunnableAdapter.call( >> Executors.java:511) >> >> at java.util.concurrent.FutureTask.runAndReset(FutureTask.java: >> 308) >> >> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >> tureTask.access$301(ScheduledThreadPoolExecutor.java:180) >> >> at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFu >> tureTask.run(ScheduledThreadPoolExecutor.java:294) >> >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out >> after [10 seconds]. This timeout is controlled by >> spark.executor.heartbeatInterval >> >> at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$ >> RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) >> >> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >> $1.applyOrElse(RpcTimeout.scala:63) >> >> at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout >> $1.applyOrElse(RpcTimeout.scala:59) >> >> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >> >> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout. >> scala:83) >> >> at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpoint >> Ref.scala:102) >> >> ... 13 more >> >> Caused by: java.util.concurrent.TimeoutException: Futures timed out >> after [10 seconds] >> >> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.s >> cala:219) >> >> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise. >> scala:223) >> >> at scala.concurrent.Await$$anonfun$result$1.apply(package. >> scala:190) >> >> >> >> *From:* Shreya Agarwal >> *Sent:* Thursday, November 10, 2016 8:16 PM >> *To:* 'Felix Cheung' <felixcheun...@hotmail.com>; user@spark.apache.org >> *Subject:* RE: Strongly Connected Components >> >> >> >> Yesterday’s run died sometime during the night, without any errors. >> Today, I am running it using GraphFrames instead. It is still spawning new >> tasks, so there is progress. >> >> >> >> *From:* Felix Cheung [mailto:felixcheun...@hotmail.com >> <felixcheun...@hotmail.com>] >> *Sent:* Thursday, November 10, 2016 7:50 PM >> *To:* user@spark.apache.org; Shreya Agarwal <shrey...@microsoft.com> >> *Subject:* Re: Strongly Connected Components >> >> >> >> It is possible it is dead. Could you check the Spark UI to see if there >> is any progress? >> >> >> >> _____________________________ >> From: Shreya Agarwal <shrey...@microsoft.com> >> Sent: Thursday, November 10, 2016 12:45 AM >> Subject: RE: Strongly Connected Components >> To: <user@spark.apache.org> >> >> >> Bump. Anyone? Its been running for 10 hours now. No results. >> >> >> >> *From:* Shreya Agarwal >> *Sent:* Tuesday, November 8, 2016 9:05 PM >> *To:* user@spark.apache.org >> *Subject:* Strongly Connected Components >> >> >> >> Hi, >> >> >> >> I am running this on a graph with >5B edges and >3B edges and have 2 >> questions – >> >> >> >> 1. What is the optimal number of iterations? >> 2. I am running it for 1 iteration right now on a beefy 100 node >> cluster, with 300 executors each having 30GB RAM and 5 cores. I have >> persisted the graph to MEMORY_AND_DISK. And it has been running for 3 >> hours >> already. Any ideas on how to speed this up? >> >> >> >> Regards, >> >> Shreya >> >> >> > >