Re: Spark Function setup and cleanup
Thank you, but that doesn't answer my general question. I might need to enrich my records using different datasources (or DB's) So the general use case I need to support is to have some kind of Function that has init() logic for creating connection to DB, query the DB for each records and enrich my input record with stuff from the DB, and use some kind of close() logic to close the connection. I have implemented this kind of use case using Map/Reduce and I want to know how can I do it with spark Thanks On Fri, Jul 25, 2014 at 6:24 AM, Yanbo Liang yanboha...@gmail.com wrote: You can refer this topic http://www.mapr.com/developercentral/code/loading-hbase-tables-spark 2014-07-24 22:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com: In my case I want to reach HBase. For every record with userId I want to get some extra information about the user and add it to result record for further prcessing On Thu, Jul 24, 2014 at 9:11 AM, Yanbo Liang yanboha...@gmail.com wrote: If you want to connect to DB in program, you can use JdbcRDD ( https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala ) 2014-07-24 18:32 GMT+08:00 Yosi Botzer yosi.bot...@gmail.com: Hi, I am using the Java api of Spark. I wanted to know if there is a way to run some code in a manner that is like the setup() and cleanup() methods of Hadoop Map/Reduce The reason I need it is because I want to read something from the DB according to each record I scan in my Function, and I would like to open the DB connection only once (and close it only once). Thanks
Re: Spark Function setup and cleanup
Look at mapPartitions. Where as map turns one value V1 into one value V2, mapPartitions lets you turn one entire Iterator[V1] to one whole Iterator [V2]. The function that does so can perform some initialization at its start, and then process all of the values, and clean up at its end. This is how you mimic a Mapper, really. The most literal translation of Hadoop MapReduce I can think of is: Mapper: mapPartitions to turn many (K1,V1) into many (K2,V2) (shuffle) groupByKey to turn that into (K2,Iterator[V2]) Reducer mapPartitions to turn many (K2,Iterator[V2]) into many (K3,V3) It's not necessarily optimal to do it this way -- especially the groupByKey bit. You have more expressive power here and need not fit it into this paradigm. But yes you can get the same effect as in MapReduce, mostly from mapPartitions. On Sat, Jul 26, 2014 at 8:52 AM, Yosi Botzer yosi.bot...@gmail.com wrote: Thank you, but that doesn't answer my general question. I might need to enrich my records using different datasources (or DB's) So the general use case I need to support is to have some kind of Function that has init() logic for creating connection to DB, query the DB for each records and enrich my input record with stuff from the DB, and use some kind of close() logic to close the connection. I have implemented this kind of use case using Map/Reduce and I want to know how can I do it with spark
Re: Emacs Setup Anyone?
Normally any setup that has inferior mode for scala repl will also support spark repl (with little or no modifications). Apart from that I personally use spark repl normally by invoking spark-shell in a shell in emacs, and I keep the scala tags(etags) for the spark loaded. With this setup it is kinda fast to do either tag prediction at point which is not accurate etc.. but its useful. Incase you are working on building this(inferior mode for spark repl) for us, I can come up with a wishlist. Prashant Sharma On Sat, Jul 26, 2014 at 3:07 AM, Andrei faithlessfri...@gmail.com wrote: I have never tried Spark REPL from within Emacs, but I remember that switching from normal Python to Pyspark was as simple as changing interpreter name at the beginning of session. Seems like ensime [1] (together with ensime-emacs [2]) should be a good point to start. For example, take a look at ensime-sbt.el [3] that defines a number of Scala/SBT commands. [1]: https://github.com/ensime/ensime-server [2]: https://github.com/ensime/ensime-emacs [3]: https://github.com/ensime/ensime-emacs/blob/master/ensime-sbt.el On Thu, Jul 24, 2014 at 10:14 PM, Steve Nunez snu...@hortonworks.com wrote: Anyone out there have a good configuration for emacs? Scala-mode sort of works, but I’d love to see a fully-supported spark-mode with an inferior shell. Searching didn’t turn up much of anything. Any emacs users out there? What setup are you using? Cheers, - SteveN CONFIDENTIALITY NOTICE NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.
How can I integrate spark cluster into my own program without using spark-submit?
I want to use spark cluster through a scala function. So I can integrate spark into my program directly. For example: When I call count function in my own program, my program will deploy the function to the cluster , so I can get the result directly def count()= { val master = spark://mache123:7077 val appName = control_test val sc = new SparkContext(master, appName) val rdd = sc.textFile(hdfs://123d101suse11sp3:9000/netflix/netflix.test) val count = rdd.count System.out.println(rdd.count = + count) count }
Fwd: Exception : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
Hi, This is my first code in shark 0.9.1. I am new to spark and shark. So I don't know where I went wrong. It will be really helpful, If some one out there can troubleshoot the problem. First of all I will give a glimpse on my code which is developed in IntellijIdea. This code is running perfectly in the editor *Code:* def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName(SharkTest).setMaster(local) .set(spark.executor.memory, 8g) .set(spark.worker.memory, 8g) .set(spark.executor.uri, http://IP/spark/spark-0.9.1.tar.gz;) .set(spark.mesos.coarse, true) .setJars(List(args(1)+/shark-assembly-0.9.1-hadoop2.0.0-cdh4.5.0.jar)) val shc = SharkEnv.initWithSharkContext(sparkConf) val q1=CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '+args(3)+' LOCATION '+args(2)+' val q3=SELECT * FROM table1 shc.runSql(q1) shc.runSql(q3) shc.sql2rdd(q3).map{resultSet= val y=resultSet.colname2indexMap.values.map(index=resultSet(index)).reduce((a,b)=a+,+b) y }.saveAsTextFile(args(4)) shc.sql(DROP TABLE IF EXISTS table1) } *build.sbt:* import AssemblyKeys._ assemblySettings name := appname version := 1.0 scalaVersion := 2.10.3 mainClass := Some(classname) libraryDependencies ++= Seq(org.apache.spark %% spark-core % 0.9.1, edu.berkeley.cs.shark %% shark % 0.9.1, org.apache.hive % hive-anttasks % 0.11.0, org.apache.hive % hive-beeline % 0.11.0, org.apache.hive % hive-cli % 0.11.0, org.apache.hive % hive-common % 0.11.0, org.apache.hive % hive-exec % 0.11.0, org.apache.hive % hive-hbase-handler % 0.11.0, org.apache.hive % hive-hwi % 0.11.0, org.apache.hive % hive-jdbc % 0.11.0, org.apache.hive % hive-metastore % 0.11.0, org.apache.hive % hive-serde % 0.11.0, org.apache.hive % hive-service % 0.11.0, org.apache.hive % hive-shims % 0.11.0, org.datanucleus % datanucleus-core % 3.2.2, org.datanucleus % datanucleus-rdbms % 3.2.1, org.datanucleus % datanucleus-api-jdo % 3.2.1, org.datanucleus % datanucleus-enhancer % 3.1.1, org.apache.derby % derby % 10.10.1.1, org.apache.hadoop % hadoop-client % 2.0.0-cdh4.5.0) resolvers ++= Seq(Akka Repository at http://repo.akka.io/releases/;, Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/;) mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf = MergeStrategy.concat case _ = MergeStrategy.first } sbt assembly plugin version : 0.10.2 The problem is only when I am trying create the jar of the code. Steps followed to create the jar: 1. Sbt clean 2. Sbt assembly When I try to run the jar using the command java -jar jarName.jar parameters , an error comes as invalid or corrupt jar The same jar is accepted when executed as java -cp jarname.jarclassnameparameters. But in this case a hive exception occurs as unable to fetch the table tablename 14/07/26 12:21:39 INFO Driver: PERFLOG method=TimeToSubmit 14/07/26 12:21:39 INFO Driver: PERFLOG method=compile 14/07/26 12:21:39 INFO ParseDriver: Parsing command: CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/home/user/foldername/Input/SharkTest' 14/07/26 12:21:39 INFO ParseDriver: Parse Completed 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Starting Semantic Analysis 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Creating table table1 position=36 14/07/26 12:21:40 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 14/07/26 12:21:40 INFO ObjectStore: ObjectStore, initialize called org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table table1 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:957) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:904) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9328) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8647) at shark.parse.SharkSemanticAnalyzer.analyzeInternal(SharkSemanticAnalyzer.scala:105) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:279) at shark.SharkDriver.compile(SharkDriver.scala:215) I would appreciate any comments about the cause of the above exception Regards, Bilna P
Exception : org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table
Hi, This is my first code in shark 0.9.1. I am new to spark and shark. So I don't know where I went wrong. It will be really helpful, If some one out there can troubleshoot the problem. First of all I will give a glimpse on my code which is developed in IntellijIdea. This code is running perfectly in the editor *Code:* def main(args: Array[String]){ val sparkConf = new SparkConf().setAppName(SharkTest).setMaster(local) .set(spark.executor.memory, 8g) .set(spark.worker.memory, 8g) .set(spark.executor.uri, http://IP/spark/spark-0.9.1.tar.gz;) .set(spark.mesos.coarse, true) .setJars(List(args(1)+/shark-assembly-0.9.1-hadoop2.0.0-cdh4.5.0.jar)) val shc = SharkEnv.initWithSharkContext(sparkConf) val q1=CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '+args(3)+' LOCATION '+args(2)+' val q3=SELECT * FROM table1 shc.runSql(q1) shc.runSql(q3) shc.sql2rdd(q3).map{resultSet= val y=resultSet.colname2indexMap.values.map(index=resultSet(index)).reduce((a,b)=a+,+b) y }.saveAsTextFile(args(4)) shc.sql(DROP TABLE IF EXISTS table1) } *build.sbt:* import AssemblyKeys._ assemblySettings name := appname version := 1.0 scalaVersion := 2.10.3 mainClass := Some(classname) libraryDependencies ++= Seq(org.apache.spark %% spark-core % 0.9.1, edu.berkeley.cs.shark %% shark % 0.9.1, org.apache.hive % hive-anttasks % 0.11.0, org.apache.hive % hive-beeline % 0.11.0, org.apache.hive % hive-cli % 0.11.0, org.apache.hive % hive-common % 0.11.0, org.apache.hive % hive-exec % 0.11.0, org.apache.hive % hive-hbase-handler % 0.11.0, org.apache.hive % hive-hwi % 0.11.0, org.apache.hive % hive-jdbc % 0.11.0, org.apache.hive % hive-metastore % 0.11.0, org.apache.hive % hive-serde % 0.11.0, org.apache.hive % hive-service % 0.11.0, org.apache.hive % hive-shims % 0.11.0, org.datanucleus % datanucleus-core % 3.2.2, org.datanucleus % datanucleus-rdbms % 3.2.1, org.datanucleus % datanucleus-api-jdo % 3.2.1, org.datanucleus % datanucleus-enhancer % 3.1.1, org.apache.derby % derby % 10.10.1.1, org.apache.hadoop % hadoop-client % 2.0.0-cdh4.5.0) resolvers ++= Seq(Akka Repository at http://repo.akka.io/releases/;, Cloudera Repository at https://repository.cloudera.com/artifactory/cloudera-repos/;) mergeStrategy in assembly := { case m if m.toLowerCase.endsWith(manifest.mf) = MergeStrategy.discard case m if m.toLowerCase.matches(meta-inf.*\\.sf$) = MergeStrategy.discard case log4j.properties = MergeStrategy.discard case m if m.toLowerCase.startsWith(meta-inf/services/) = MergeStrategy.filterDistinctLines case reference.conf = MergeStrategy.concat case _ = MergeStrategy.first } sbt assembly plugin version : 0.10.2 The problem is only when I am trying create the jar of the code. Steps followed to create the jar: 1. Sbt clean 2. Sbt assembly When I try to run the jar using the command java -jar jarName.jar parameters , an error comes as invalid or corrupt jar The same jar is accepted when executed as java -cp jarname.jarclassnameparameters. But in this case a hive exception occurs as unable to fetch the table tablename 14/07/26 12:21:39 INFO Driver: PERFLOG method=TimeToSubmit 14/07/26 12:21:39 INFO Driver: PERFLOG method=compile 14/07/26 12:21:39 INFO ParseDriver: Parsing command: CREATE EXTERNAL TABLE IF NOT EXISTS table1(startIpNum string,endIpNum string,locId string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/home/user/foldername/Input/SharkTest' 14/07/26 12:21:39 INFO ParseDriver: Parse Completed 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Starting Semantic Analysis 14/07/26 12:21:40 INFO SharkSemanticAnalyzer: Creating table table1 position=36 14/07/26 12:21:40 INFO HiveMetaStore: 0: Opening raw store with implemenation class:org.apache.hadoop.hive.metastore.ObjectStore 14/07/26 12:21:40 INFO ObjectStore: ObjectStore, initialize called org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table table1 at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:957) at org.apache.hadoop.hive.ql.metadata.Hive.getTable(Hive.java:904) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeCreateTable(SemanticAnalyzer.java:9328) at org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.analyzeInternal(SemanticAnalyzer.java:8647) at shark.parse.SharkSemanticAnalyzer.analyzeInternal(SharkSemanticAnalyzer.scala:105) at org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.analyze(BaseSemanticAnalyzer.java:279) at shark.SharkDriver.compile(SharkDriver.scala:215) I would appreciate any comments about the cause of the above exception Regards, Bilna P
Help using streaming from Spark Shell
Hi, I'm starting spark-shell like this: SPARK_MEM=1g SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600 /spark/bin/spark-shell -c 3 but when I try to create a streaming context val scc = new StreamingContext(sc, Seconds(10)) I get: org.apache.spark.SparkException: Spark Streaming cannot be used without setting spark.cleaner.ttl; set this property before creating a SparkContext creating a SparkContext (use SPARK_JAVA_OPTS for the shell) at org.apache.spark.streaming.StreamingContext.init(StreamingContext.scala:121) I also tried export SPARK_JAVA_OPTS=-Dspark.cleaner.ttl=3600 before calling spark-shell but with no luck... What am I doing wrong? This is spark 0.9.1 -- I cannot upgrade
Lot of object serialization even with MEMORY_ONLY
Hello, I am executing the SparkPageRank example. It uses the cache() API for persistence of RDDs. And if I am not wrong, it in turn uses MEMORY_ONLY storage level. However, in oprofile report it shows a lot of activity in writeObject0 function. There is not even a single Spilling in-memory... message in the output/log. This is because I am using a huge heap size of 120GB. Can someone please tell me why do I see so much serialization happening, even though MEMORY_ONLY storage level is used? The spark version that I am using is 1.0.1 Thanks, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Lot-of-object-serialization-even-with-MEMORY-ONLY-tp10722.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spilling in-memory... messages in log even with MEMORY_ONLY
Hello, I am running SparkPageRank example which uses cache() API for persistence. This AFAIK, uses MEMORY_ONLY storage level. But even in this setup, I see a lot of WARN ExternalAppendOnlyMap: Spilling in-memory map of messages in the log. Why is it so? I thought that MEMORY_ONLY means kick out the RDD if there isn't enough memory available. Thanks, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
graphx cached partitions wont go away
i have graphx queries running inside a service where i collect the results to the driver and do not hold any references to the rdds involved in the queries. my assumption was that with the references gone spark would go and remove the cached rdds from memory (note, i did not cache them, graphx did). yet they hang around... is my understanding of how the ContextCleaner works incorrect? or could it be that grapx holds some references internally to rdds, preventing garbage collection? maybe even circular references?
Re: Spilling in-memory... messages in log even with MEMORY_ONLY
These messages are actually not about spilling the RDD, they're about spilling intermediate state in a reduceByKey, groupBy or other operation whose state doesn't fit in memory. We have to do that in these cases to avoid going out of memory. You can minimize spilling by having more reduce tasks though, which will mean less data per task. Matei On Jul 26, 2014, at 1:22 PM, lokesh.gidra lokesh.gi...@gmail.com wrote: Hello, I am running SparkPageRank example which uses cache() API for persistence. This AFAIK, uses MEMORY_ONLY storage level. But even in this setup, I see a lot of WARN ExternalAppendOnlyMap: Spilling in-memory map of messages in the log. Why is it so? I thought that MEMORY_ONLY means kick out the RDD if there isn't enough memory available. Thanks, Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spilling in-memory... messages in log even with MEMORY_ONLY
Thanks for the reply. I understand this now. But in another situation, when I use large heap size to avoid any spilling (I confirm, there are no spilling messages in log), I still see a lot of time being spent in writeObject0() function. Can you please tell me why would there be any serialization done? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10727.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Spilling in-memory... messages in log even with MEMORY_ONLY
Even in local mode, Spark serializes data that would be sent across the network, e.g. in a reduce operation, so that you can catch errors that would happen in distributed mode. You can make serialization much faster by using the Kryo serializer; see http://spark.apache.org/docs/latest/tuning.html. But it won't go away. Basically the code is not optimized for the very best performance on a single node, it's designed to make it easy to build your program locally and run it on a cluster without surprises. Matei On Jul 26, 2014, at 3:08 PM, lokesh.gidra lokesh.gi...@gmail.com wrote: Thanks for the reply. I understand this now. But in another situation, when I use large heap size to avoid any spilling (I confirm, there are no spilling messages in log), I still see a lot of time being spent in writeObject0() function. Can you please tell me why would there be any serialization done? Thanks Lokesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spilling-in-memory-messages-in-log-even-with-MEMORY-ONLY-tp10723p10727.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: streaming window not behaving as advertised (v1.0.1)
Yeah, maybe I should bump the issue to major. Now that I thought about to give my previous answer, this should be easy to fix just by doing a foreachRDD on all the input streams within the system (rather than explicitly doing it like I asked you to do). Thanks Alan, for testing this out and confirming that this was the same issue. I was worried that this is a totally new issue that we did not know of. TD On Wed, Jul 23, 2014 at 12:37 AM, Alan Ngai a...@opsclarity.com wrote: TD, it looks like your instincts were correct. I misunderstood what you meant. If I force an eval on the inputstream using foreachRDD, the windowing will work correctly. If I don’t do that, lazy eval somehow screws with window batches I eventually receive. Any reason the bug is categorized as minor? It seems that anyone who uses the windowing functionality would run into this bug. I imagine this would include anyone who wants to use spark streaming to aggregate data in fixed time batches, which seems like a fairly common use case. Alan On Jul 22, 2014, at 11:30 PM, Alan Ngai a...@opsclarity.com wrote: foreachRDD is how I extracted values in the first place, so that’s not going to make a difference. I don’t think it’s related to SPARK-1312 because I’m generating data every second in the first place and I’m using foreachRDD right after the window operation. The code looks something like val batchInterval = 5 val windowInterval = 25 val slideInterval = 15 val windowedStream = inputStream.window(Seconds(windowInterval), Seconds(slideInterval)) val outputFunc = (r: RDD[MetricEvent], t: Time) = { println( %s.format(t.milliseconds / 1000)) r.foreach{metric = val timeKey = metric.timeStamp / batchInterval * batchInterval println(%s %s %s %s.format(timeKey, metric.timeStamp, metric.name, metric.value)) } } testWindow.foreachRDD(outputFunc) On Jul 22, 2014, at 10:13 PM, Tathagata Das tathagata.das1...@gmail.com wrote: It could be related to this bug that is currently open. https://issues.apache.org/jira/browse/SPARK-1312 Here is a workaround. Can you put a inputStream.foreachRDD(rdd = { }) and try these combos again? TD On Tue, Jul 22, 2014 at 6:01 PM, Alan Ngai a...@opsclarity.com wrote: I have a sample application pumping out records 1 per second. The batch interval is set to 5 seconds. Here’s a list of “observed window intervals” vs what was actually set window=25, slide=25 : observed-window=25, overlapped-batches=0 window=25, slide=20 : observed-window=20, overlapped-batches=0 window=25, slide=15 : observed-window=15, overlapped-batches=0 window=25, slide=10 : observed-window=20, overlapped-batches=2 window=25, slide=5 : observed-window=25, overlapped-batches=3 can someone explain this behavior to me? I’m trying to aggregate metrics by time batches, but want to skip partial batches. Therefore, I’m trying to find a combination which results in 1 overlapped batch, but no combination I tried gets me there. Alan
Re: SparkContext startup time out
I am bumping into this problem as well. I am trying to move to akka 2.3.x from 2.2.x in order to port to Scala 2.11 - only akka 2.3.x is available in Scala 2.11. All 2.2.x akka works fine, and all 2.3.x akka give the following exception in new SparkContext. Still investigating why.. java.util.concurrent.TimeoutException: Futures timed out after [1 milliseconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at akka.remote.Remoting.start(Remoting.scala:180) at akka.remote.RemoteActorRefProvider.init(RemoteActorRefProvider.scala:184) at akka.actor.ActorSystemImpl.liftedTree2$1(ActorSystem.scala:618) at akka.actor.ActorSystemImpl._start$lzycompute(ActorSystem.scala:615) at akka.actor.ActorSystemImpl._start(ActorSystem.scala:615) On Fri, May 30, 2014 at 6:33 AM, Pierre B pierre.borckm...@realimpactanalytics.com wrote: I was annoyed by this as well. It appears that just permuting the order of decencies inclusion solves this problem: first spark, than your cdh hadoop distro. HTH, Pierre -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-startup-time-out-tp1753p6582.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: java.lang.StackOverflowError when calling count()
Responses inline. On Wed, Jul 23, 2014 at 4:13 AM, lalit1303 la...@sigmoidanalytics.com wrote: Hi, Thanks TD for your reply. I am still not able to resolve the problem for my use case. I have let's say 1000 different RDD's, and I am applying a transformation function on each RDD and I want the output of all rdd's combined to a single output RDD. For, this I am doing the following: *Loop Start* tempRDD = jaRDD.rdd().repartition(1).mapPartitions().toJavaRDD(); *//creating new rdd in every loop* outRDD = outRDD.union(tempRDD); *//keep joining RDD's to get the output into a single RDD* *//after every 10 iteration, in order to truncate the lineage* cachRDD = outRDD.cache(); cachRDD.checkpoint(); System.out.println(cachRDD.collect().size()); outRDD = new JavaRDDString(cachRDD.rdd(),cachRDD.classTag()); *Loop Ends* *//finally after whole computation* outRDD.saveAsTextFile(..) The above operations is overall slow, runs successfully when performed less iterations i.e. ~100. But, when the num of iterations in increased to ~1000, The whole job is taking more than *30 mins* and ultimately break down giving OutOfMemory error. The total size of data is around 1.4 MB. As of now, I am running the job on spark standalone mode with 2 cores and 2.9 GB memory. I think this is happening because how you are caching the output RDD that are being generated repeatedly. In every iteration, it is building this new union RDD which contains the data of the previous union RDD plus some new data. Since each of these union RDDs are cached, the underlying data is being cached repeatedly. So the cached Iteration 1: union RDD: X MB Iteration 2: union RDD: 2X MB | Total size cached: 3X Iteration 3: union RDD: 3X MB | Total size cached: 6X MB Iteration 4: union RDD: 4X MB | Total size cached: 10X MB ... If you do the math, that is a quadratic increase in the size of the data being processed and cached, wrt the # iterations. This leads to both increase in run time and memory usage. I also observed that when collect() operation is performed, the number of tasks keeps on increasing as the loop proceeds, like on first collect() 22 total task, then ~40 total tasks ... ~300 task for single collect. Does this means that all the operations are repeatedly performed, and RDD lineage is not broken?? Same reason as above. Each union RDD is build by appending the partitions of the previous union RDD plus the new set of partitions (~22 partitions). So Nth union RDD has N * 22 partitions, hence that many tasks. You could change this by also doing repartitioning when you want to cache+checkpoint the union RDD (therefore, outRDD.repartition(100).cache().checkpoint().count()). And do you really need all the data to be collected at the driver? If you are doing the cachRDD.collect() just to forced the checkpoint, then use cachRDD.count() Can you please elaborate on the point from your last post i.e. how to perform: *Create a modified RDD R` which has the same data as RDD R but does not have the lineage. This is done by creating a new BlockRDD using the ids of blocks of data representing the in-memory R* Please refer to the lines in the function: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala#L74 What those lines do is save the data of the associated RDD to HDFS files, and then create a new CheckpointRDD from the same files.Then the dependency of the associated RDD is changed to use the new RDD. This truncates the lineage because the associated RDD's parent is not the new RDD which has a very short lineage (links to checkpoint files). And the previous dependencies (parent RDDs) are forgotten. This implementation can be modified by forcing the data of the associated RDD to be cached with StorageLevel.MEMORY_AND_DISK_2. And then instead of CheckpointRDD, you can create a new BlockRDD (using the names of the blocks that are used to cache the RDD), which is then set as the new dependency. This is definitely a behind-the-public API implementation, that is - Lalit Yadav la...@sigmoidanalytics.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-StackOverflowError-when-calling-count-tp5649p10488.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark MLlib vs BIDMach Benchmark
BIDMach is CPU and GPU-accelerated Machine Learning Library also from Berkeley. https://github.com/BIDData/BIDMach/wiki/Benchmarks They did benchmark against Spark 0.9, and they claimed that it's significantly faster than Spark MLlib. In Spark 1.0, lot of performance optimization had been done, and sparse data is supported. It will be interesting to see new benchmark result. Anyone familiar with BIDMach? Are they as fast as they claim? Sincerely, DB Tsai --- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai