Re: Implementing a custom Spark shell
Hi, I've tried to enable debug logging, but can't figure out what might be going wrong. Can anyone assist decyphering the log? The log of the startup and run attempts is at http://pastebin.com/XyeY92VF This uses SparkILoop, DEBUG level logging and settings.debug.value = true option. Line 323: Spark welcome message Line 746: The NullPointerException that occurs during startup whenever I use SparkILoop instead of ILoop Lines 1973-2252: Running an RDD count, which works correctly Lines 2254-2890: Running an RDD filter + count, which fails due to a ClassNotFoundException (line 2528) Thanks. *Sampo Niskanen* *Lead developer / Wellmo* sampo.niska...@wellmo.com +358 40 820 5291 On Fri, Feb 28, 2014 at 10:46 AM, Prashant Sharma scrapco...@gmail.comwrote: You can enable debug logging for repl, thankfully it uses sparks logging framework. Trouble must be with wrappers. Prashant Sharma On Fri, Feb 28, 2014 at 12:29 PM, Sampo Niskanen sampo.niska...@wellmo.com wrote: Hi, Thanks for the pointers. I did get my code working within the normal spark-shell. However, since I'm building a separate analysis service which pulls in the Spark libraries using SBT, I'd much rather have the custom shell incorporated in that, instead of having to use the default downloadable distribution. I figured out how to create a custom Scala REPL using the instructions at http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10 (The latter answer is my helper class that I use.) I injected the SparkContext and my RDD's and for example rdd.count works fine. However, when I try to perform a filter operation, I get a ClassNotFoundException [1]. My guess is that the inline function I define is created only within the REPL, and does not get sent to the processors (even though I'm using a local cluster). I found out that there's a separate spark-repl library, which contains the SparkILoop class. When I replace the ILoop with SparkILoop, I get the Spark logo + version number, a NullPointerException [2] and then the Scala prompt. Still, I get exactly the same ClassNotFoundException when trying to perform a filter operation. Can anyone give any pointers on how to get this working? Best regards, Sampo N. ClassNotFoundException [1]: scala data.profile.filter(p = p.email == sampo.niska...@mwsoy.com ).count 14/02/28 08:49:16 ERROR Executor: Exception in task ID 1 java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195) at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) 14/02/28
Re: Kryo serialization does not compress
We are trying to use kryo serialization, but with kryo serialization ON the memory consumption does not change. We have tried this on multiple sets of data. We have also checked the logs of Kryo serialization and have confirmed that Kryo is being used. Can somebody please help us with this? The script used is given below. SCRIPT /import scala.collection.JavaConversions.asScalaBuffer import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.JavaConverters.asScalaBufferConverter import scala.collection.mutable.Buffer import scala.Array import scala.math.Ordering.Implicits._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.RangePartitioner import org.apache.spark.HashPartitioner //For Kryo logging import com.esotericsoftware.minlog.Log import com.esotericsoftware.minlog.Log._ Log.set(LEVEL_TRACE); val query = select array(level_1, level_2, level_3, level_4, level_5, level_6, level_7, level_8, level_9, level_10, level_11, level_12, level_13, level_14, level_15, level_16, level_17, level_18, level_19, level_20, level_21, level_22, level_23, level_24, level_25) as unitids, class, cuts, type, data from table1 p join table2 b on (p.UnitId = b.unit_id) where runid = 912 and b.snapshotid = 220 and p.UnitId = b.unit_id val rows: RDD[((Buffer[Any], String, Buffer[Any]), (String, scala.collection.mutable.Buffer[Any]))] = sc.sql2rdd(query).map(row = ((row.getList(unitids).asInstanceOf[java.util.List[Any]].asScala, row.getString(class), row.getList(cuts).asInstanceOf[java.util.List[Any]].asScala), (row.getString(type), row.getList(data).asInstanceOf[java.util.List[Any]].asScala))) var rows2Array: RDD[((Buffer[Any], String, Buffer[Any]), (String, Array[Float]))] = rows.map(row = (row._1, (row._2._1, ((row._2._2.map(y = y match { case floatWritable: org.apache.hadoop.io.FloatWritable = floatWritable.get case lazyFloat: org.apache.hadoop.hive.serde2.`lazy`.LazyFloat = lazyFloat.getWritableObject().get case _ = println(unknown data type + y + : ); 0 }))).toArray))) var allArrays: RDD[((Array[Long], String, Buffer[Any]), (String, Array[Float]))] = rows2Array.map(row = ((row._1._1.map(x = x match {case longWritable: org.apache.hadoop.io.LongWritable = longWritable.get case lazyLong: org.apache.hadoop.hive.serde2.`lazy`.LazyLong = lazyLong.getWritableObject().get case _ = println(unknown data type + x + : ); 0}).toArray, row._1._2, row._1._3), row._2)) var dataRdd: RDD[((Array[Long], String, Array[String]), (String, Array[Float]))] = allArrays.map(row = ((row._1._1, row._1._2, row._1._3.map(x = x match { case str: String = str case _ = println(unknown data type + x + : ); new String()}).toArray), row._2)) dataRdd = dataRdd.partitionBy(new HashPartitioner(64)).persist(StorageLevel.MEMORY_ONLY_SER) dataRdd.count() / -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Kryo-serialization-does-not-compress-tp2042p2347.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: disconnected from cluster; reconnecting gives java.net.BindException
So this happened again today. As I noted before, the Spark shell starts up fine after I reconnect to the cluster, but this time around I tried opening a file and doing some processing. I get this message over and over (and can't do anything): 14/03/06 15:43:09 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory So I know that this message is related to my getting disconnected from the cluster while in the Spark shell, and after a while it should automatically clear up. But how I can I resolve this directly, without waiting? Looking at the cluster UI doesn't show me anything I know to use towards resolving this. Nick On Wed, Mar 5, 2014 at 3:12 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Whoopdeedoo, after just waiting for like an hour (well, I was doing other stuff) the process holding that address seems to have died automatically and now I can start up pyspark without any warnings. Would there be a faster way to go through this than just wait around for the orphaned process to die? Nick On Wed, Mar 5, 2014 at 1:01 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: So I was doing stuff in pyspark on a cluster in EC2. I got booted due to a network issue. I reconnect to the cluster and start up pyspark again. I get these warnings: 14/03/05 17:54:56 WARN component.AbstractLifeCycle: FAILED SelectChannelConnector@0.0.0.0:4040: java.net.BindException: Address already in use Is this Bad(tm)? Do I need to do anything? sc appears to be available as usual. Nick -- View this message in context: disconnected from cluster; reconnecting gives java.net.BindExceptionhttp://apache-spark-user-list.1001560.n3.nabble.com/disconnected-from-cluster-reconnecting-gives-java-net-BindException-tp2309.html Sent from the Apache Spark User List mailing list archivehttp://apache-spark-user-list.1001560.n3.nabble.com/at Nabble.com.
Building spark with native library support
Hi, I've successfully built 0.9.0-incubating on Solaris using sbt, following the instructions at http://spark.incubator.apache.org/docs/latest/ and it seems to work OK. However, when I start it up I get an error about missing Hadoop native libraries. I can't find any mention of how to build the native components in the instructions, how is that done? Thanks, -- Alan Burlison --
Re: PIG to SPARK
Thanks Mayur. I don't have clear idea on how pipe works wanted to understand more on it. But when do we use pipe() and how it works ?. Can you please share some sample code if you have ( even pseudo-code is fine ) ? It will really help. Regards, Suman Bharadwaj S On Thu, Mar 6, 2014 at 3:46 AM, Mayur Rustagi mayur.rust...@gmail.comwrote: The real question is why do you want to run pig script using Spark Are you planning to user spark as underlying processing engine for Spark? thats not simple Are you planning to feed Pig data to spark for further processing, then you can write it to HDFS trigger your spark script. rdd.pipe is basically similar to Hadoop streaming, allowing you to run a script on each partition of the RDD get output as another RDD. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Wed, Mar 5, 2014 at 10:29 AM, suman bharadwaj suman@gmail.comwrote: Hi, How can i call pig script using SPARK. Can I use rdd.pipe() here ? And can anyone share sample implementation of rdd.pipe () and if you can explain how rdd.pipe() works, it would really really help. Regards, SB
Re: Building spark with native library support
Is it an error, or just a warning? In any case, you need to get those libraries from a build of Hadoop for your platform. Then add them to the SPARK_LIBRARY_PATH environment variable in conf/spark-env.sh, or to your -Djava.library.path if launching an application separately. These libraries just speed up some compression codecs BTW, so it should be fine to run without them too. Matei On Mar 6, 2014, at 9:04 AM, Alan Burlison alan.burli...@oracle.com wrote: Hi, I've successfully built 0.9.0-incubating on Solaris using sbt, following the instructions at http://spark.incubator.apache.org/docs/latest/ and it seems to work OK. However, when I start it up I get an error about missing Hadoop native libraries. I can't find any mention of how to build the native components in the instructions, how is that done? Thanks, -- Alan Burlison --
RE: Building spark with native library support
Hi, I am trying to setup Spark in windows for development environment. I get following error when I run sbt. Pl help me to resolve this issue. I am working for Verizon and am in my company network and can't access internet without proxy. C:\Userssbt Getting org.fusesource.jansi jansi 1.11 ... You probably access the destination server through a proxy server that is not we ll configured. You probably access the destination server through a proxy server that is not we ll configured. You probably access the destination server through a proxy server that is not we ll configured. :: problems summary :: WARNINGS Host repo.typesafe.com not found. url=http://repo.typesafe.com/typesafe/ ivy-releases/org.fusesource.jansi/jansi/1.11/ivys/ivy.xml Host repo1.maven.org not found. url=http://repo1.maven.org/maven2/org/fu sesource/jansi/jansi/1.11/jansi-1.11.pom Host repo1.maven.org not found. url=http://repo1.maven.org/maven2/org/fu sesource/jansi/jansi/1.11/jansi-1.11.jar module not found: org.fusesource.jansi#jansi;1.11 local: tried C:\Users\v983654\.ivy2\local\org.fusesource.jansi\jansi\1.11\ivys\ivy. xml -- artifact org.fusesource.jansi#jansi;1.11!jansi.jar: C:\Users\v983654\.ivy2\local\org.fusesource.jansi\jansi\1.11\jars\jans i.jar typesafe-ivy-releases: tried http://repo.typesafe.com/typesafe/ivy-releases/org.fusesource.jansi/ja nsi/1.11/ivys/ivy.xml Maven Central: tried http://repo1.maven.org/maven2/org/fusesource/jansi/jansi/1.11/jansi-1. 11.pom -- artifact org.fusesource.jansi#jansi;1.11!jansi.jar: http://repo1.maven.org/maven2/org/fusesource/jansi/jansi/1.11/jansi-1. 11.jar :: :: UNRESOLVED DEPENDENCIES :: :: :: org.fusesource.jansi#jansi;1.11: not found :: :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS unresolved dependency: org.fusesource.jansi#jansi;1.11: not found Error during sbt execution: Error retrieving required libraries (see C:\Users\v983654\.sbt\boot\update.log for complete log) Error: Could not retrieve jansi 1.11 Thanks Arockia Raja -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Thursday, March 06, 2014 11:44 AM To: user@spark.apache.org Subject: Re: Building spark with native library support Is it an error, or just a warning? In any case, you need to get those libraries from a build of Hadoop for your platform. Then add them to the SPARK_LIBRARY_PATH environment variable in conf/spark-env.sh, or to your -Djava.library.path if launching an application separately. These libraries just speed up some compression codecs BTW, so it should be fine to run without them too. Matei On Mar 6, 2014, at 9:04 AM, Alan Burlison alan.burli...@oracle.com wrote: Hi, I've successfully built 0.9.0-incubating on Solaris using sbt, following the instructions at http://spark.incubator.apache.org/docs/latest/ and it seems to work OK. However, when I start it up I get an error about missing Hadoop native libraries. I can't find any mention of how to build the native components in the instructions, how is that done? Thanks, -- Alan Burlison --
Access SBT with proxy
export JAVA_OPTS=$JAVA_OPTS -Dhttp.proxyHost=yourserver -Dhttp.proxyPort=8080 -Dhttp.proxyUser=username -Dhttp.proxyPassword=password Also please use separate thread for different questions. Regards Mayur Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 10:03 AM, Jeyaraj, Arockia R (Arockia) arockia.r.jeya...@verizon.com wrote: Hi, I am trying to setup Spark in windows for development environment. I get following error when I run sbt. Pl help me to resolve this issue. I am working for Verizon and am in my company network and can't access internet without proxy. C:\Userssbt Getting org.fusesource.jansi jansi 1.11 ... You probably access the destination server through a proxy server that is not we ll configured. You probably access the destination server through a proxy server that is not we ll configured. You probably access the destination server through a proxy server that is not we ll configured. :: problems summary :: WARNINGS Host repo.typesafe.com not found. url= http://repo.typesafe.com/typesafe/ ivy-releases/org.fusesource.jansi/jansi/1.11/ivys/ivy.xml Host repo1.maven.org not found. url= http://repo1.maven.org/maven2/org/fu sesource/jansi/jansi/1.11/jansi-1.11.pom Host repo1.maven.org not found. url= http://repo1.maven.org/maven2/org/fu sesource/jansi/jansi/1.11/jansi-1.11.jar module not found: org.fusesource.jansi#jansi;1.11 local: tried C:\Users\v983654\.ivy2\local\org.fusesource.jansi\jansi\1.11\ivys\ivy. xml -- artifact org.fusesource.jansi#jansi;1.11!jansi.jar: C:\Users\v983654\.ivy2\local\org.fusesource.jansi\jansi\1.11\jars\jans i.jar typesafe-ivy-releases: tried http://repo.typesafe.com/typesafe/ivy-releases/org.fusesource.jansi/ja nsi/1.11/ivys/ivy.xml Maven Central: tried http://repo1.maven.org/maven2/org/fusesource/jansi/jansi/1.11/jansi-1. 11.pom -- artifact org.fusesource.jansi#jansi;1.11!jansi.jar: http://repo1.maven.org/maven2/org/fusesource/jansi/jansi/1.11/jansi-1. 11.jar :: :: UNRESOLVED DEPENDENCIES :: :: :: org.fusesource.jansi#jansi;1.11: not found :: :: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS unresolved dependency: org.fusesource.jansi#jansi;1.11: not found Error during sbt execution: Error retrieving required libraries (see C:\Users\v983654\.sbt\boot\update.log for complete log) Error: Could not retrieve jansi 1.11 Thanks Arockia Raja -Original Message- From: Matei Zaharia [mailto:matei.zaha...@gmail.com] Sent: Thursday, March 06, 2014 11:44 AM To: user@spark.apache.org Subject: Re: Building spark with native library support Is it an error, or just a warning? In any case, you need to get those libraries from a build of Hadoop for your platform. Then add them to the SPARK_LIBRARY_PATH environment variable in conf/spark-env.sh, or to your -Djava.library.path if launching an application separately. These libraries just speed up some compression codecs BTW, so it should be fine to run without them too. Matei On Mar 6, 2014, at 9:04 AM, Alan Burlison alan.burli...@oracle.com wrote: Hi, I've successfully built 0.9.0-incubating on Solaris using sbt, following the instructions at http://spark.incubator.apache.org/docs/latest/ and it seems to work OK. However, when I start it up I get an error about missing Hadoop native libraries. I can't find any mention of how to build the native components in the instructions, how is that done? Thanks, -- Alan Burlison --
RE: Access SBT with proxy
Thanks Alan. I am very new to Spark. I am trying to set Spark development environment in Windows. I added below mentioned export as set in sbt.bat file and tried, it was not working. Where will I see .gitconfig? set JAVA_OPTS=%JAVA_OPTS% -Dhttp.proxyHost=myservername -Dhttp.proxyPort=8080 -Dhttp.proxyUser=username -Dhttp.proxyPassword=password Thanks Arockia Raja -Original Message- From: Alan Burlison [mailto:alan.burli...@oracle.com] Sent: Thursday, March 06, 2014 12:29 PM To: user@spark.apache.org Cc: Mayur Rustagi; Jeyaraj, Arockia R (Arockia) Subject: Re: Access SBT with proxy On 06/03/2014 18:08, Mayur Rustagi wrote: export JAVA_OPTS=$JAVA_OPTS -Dhttp.proxyHost=yourserver -Dhttp.proxyPort=8080 -Dhttp.proxyUser=username -Dhttp.proxyPassword=password I had exactly the same problem and tried the above, it worked for some of the components but the problem was that stuff that was pulled in from git failed because some of the URLs in in the Spark wad are git:// ones and not http:// ones. To get git to play nice through a HTTP proxy I had to add the following to my .gitconfig: -- [http] proxy = http://yourserver:8080 [url https://github.com/;] insteadOf = git://github.com/ -- So you'll probably need to do that in addition to Mayur's suggestion. -- Alan Burlison --
Re: major Spark performance problem
Dana, When you run multiple applications under Spark, and if each application takes up the entire cluster resources, it is expected that one will block the other completely, thus you're seeing that the wall time add together sequentially. In addition there is some overhead associated with starting up a new application/SparkContext. Your other mode of sharing a single SparkContext, if your use case allows it, is more promising in that workers are available to work on tasks in parallel (but ultimately still subject to maximum resource limits). Without knowing what your actual workload is, it's hard to tell in absolute terms whether 12 seconds is reasonable or not. One reason for the jump from 12s in local mode to 40s in cluster mode would be the HBase bottleneck---you apparently would have 2x10=20 clients going against the HBase data source instead of 1 (or however many local threads you have). Assuming this is an increase of useful work output by a factor of 20x, a jump from 12s to 40s wall time is actually quite attractive. NB: given my assumption that the HBase data source is not parallelized along with the Spark cluster, you would run into sublinear performance issues (HBase-perf-limited or network-bandwidth-limited) as you scale out your cluster size. -- Christopher T. Nguyen Co-founder CEO, Adatao http://adatao.com linkedin.com/in/ctnguyen On Thu, Mar 6, 2014 at 11:49 AM, Livni, Dana dana.li...@intel.com wrote: Hi all, We have a big issue and would like if someone have any insights or ideas. The problem is composed of two connected problems. 1. Run time of a single application. 2. Run time of multiple applications in parallel is almost linear with run time of a single application. We have written a spark application patching its data from HBase. We are running the application using YARN-client resource manager. The cluster have 2 nodes (both uses as HBase data nodes and spark/YARN processing nodes). We have few sparks steps in our app, the heaviest and longest from all Is describe by this flow 1. flatMap - converting the HBase RDD to objects RDD. 2. Group by key 3. Map making the calculations we need. (checking set of basic mathematical conditions) When running a single instance of this step Working on only 2000 records this step takes around 13s. (all records are related to one key) The HBase table we fetch the data from have 5 regions. The implementation we have made is using REST service which creates one spark context Each request we make to this service, run an instance of the application (but a gain all uses the same spark contxt) Each request creates multiple threads which run all the application steps. When running one request (with 10 parallel threads) the relevant stage takes about 40s for all the threads - each one of them takes 40s itself, but they almost run completely in parallel, so also the total run time of one request is 40s. We have allocated 10 workers each with 512M memory (no need for more, looks like all the RDD is cached) So the first question: Does this run time make sense? For us it seems too long? Do you have an idea what are we doing wrong The second problem and the more serious one We need to run multiple parallel request of this kind. When doing so the run time spikes again and instead of an request that runs in about 1m (40s is only the main stage) We get 2 applications both running almost in parallel both run for 2m. This also happens if we use 2 different services and sending each of them 1 request. These running times grows as we send more requests. We have also monitored the CPU usage of the node and each request makes it jump to 90%. If we reduce the number of workers to 2 the CPU usage jump is to about 35%, but the run time increases significantly. This seems very unlikely to us. Are there any spark parameters we should consider to change? Any other ideas? We are quite stuck on this. Thanks in advanced Dana - Intel Electronics Ltd. This e-mail and any attachments may contain confidential material for the sole use of the intended recipient(s). Any review or distribution by others is strictly prohibited. If you are not the intended recipient, please contact the sender and delete all copies.
Pig on Spark
Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts.
Re: Pig on Spark
I had asked a similar question on the dev mailing list a while back (Jan 22nd). See the archives: http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser - look for spork. Basically Matei said: Yup, that was it, though I believe people at Twitter picked it up again recently. I’d suggest asking Dmitriy if you know him. I’ve seen interest in this from several other groups, and if there’s enough of it, maybe we can start another open source repo to track it. The work in that repo you pointed to was done over one week, and already had most of Pig’s operators working. (I helped out with this prototype over Twitter’s hack week.) That work also calls the Scala API directly, because it was done before we had a Java API; it should be easier with the Java one. Tom On Thursday, March 6, 2014 3:11 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts.
Re: Pig on Spark
There is some work to make this work on yarn at https://github.com/aniket486/pig. (So, compile pig with ant -Dhadoopversion=23) You can look at https://github.com/aniket486/pig/blob/spork/pig-spark to find out what sort of env variables you need (sorry, I haven't been able to clean this up- in-progress). There are few known issues with this, I will work on fixing them soon. Known issues- 1. Limit does not work (spork-fix) 2. Foreach requires to turn off schema-tuple-backend (should be a pig-jira) 3. Algebraic udfs dont work (spork-fix in-progress) 4. Group by rework (to avoid OOMs) 5. UDF Classloader issue (requires SPARK-1053, then you can put pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf jars) ~Aniket On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves tgraves...@yahoo.com wrote: I had asked a similar question on the dev mailing list a while back (Jan 22nd). See the archives: http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser - look for spork. Basically Matei said: Yup, that was it, though I believe people at Twitter picked it up again recently. I'd suggest asking Dmitriy if you know him. I've seen interest in this from several other groups, and if there's enough of it, maybe we can start another open source repo to track it. The work in that repo you pointed to was done over one week, and already had most of Pig's operators working. (I helped out with this prototype over Twitter's hack week.) That work also calls the Scala API directly, because it was done before we had a Java API; it should be easier with the Java one. Tom On Thursday, March 6, 2014 3:11 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts. -- ...:::Aniket:::... Quetzalco@tl
Re: Building spark with native library support
On 06/03/2014 18:55, Matei Zaharia wrote: For the native libraries, you can use an existing Hadoop build and just put them on the path. For linking to Hadoop, Spark grabs it through Maven, but you can do mvn install locally on your version of Hadoop to install it to your local Maven cache, and then configure Spark to use that version. Spark never builds Hadoop itself, it just downloads it through Maven. OK, thanks for the pointers. -- Alan Burlison --
RE: Pig on Spark
Hi Aniket,Many thanks! I will check this out. Date: Thu, 6 Mar 2014 13:46:50 -0800 Subject: Re: Pig on Spark From: aniket...@gmail.com To: user@spark.apache.org; tgraves...@yahoo.com There is some work to make this work on yarn at https://github.com/aniket486/pig. (So, compile pig with ant -Dhadoopversion=23) You can look at https://github.com/aniket486/pig/blob/spork/pig-spark to find out what sort of env variables you need (sorry, I haven't been able to clean this up- in-progress). There are few known issues with this, I will work on fixing them soon. Known issues-1. Limit does not work (spork-fix)2. Foreach requires to turn off schema-tuple-backend (should be a pig-jira)3. Algebraic udfs dont work (spork-fix in-progress) 4. Group by rework (to avoid OOMs)5. UDF Classloader issue (requires SPARK-1053, then you can put pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf jars) ~Aniket On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves tgraves...@yahoo.com wrote: I had asked a similar question on the dev mailing list a while back (Jan 22nd). See the archives: http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser - look for spork. Basically Matei said: Yup, that was it, though I believe people at Twitter picked it up again recently. I’d suggest asking Dmitriy if you know him. I’ve seen interest in this from several other groups, and if there’s enough of it, maybe we can start another open source repo to track it. The work in that repo you pointed to was done over one week, and already had most of Pig’s operators working. (I helped out with this prototype over Twitter’s hack week.) That work also calls the Scala API directly, because it was done before we had a Java API; it should be easier with the Java one. Tom On Thursday, March 6, 2014 3:11 PM, Sameer Tilak ssti...@live.com wrote: Hi everyone, We are using to Pig to build our data pipeline. I came across Spork -- Pig on Spark at: https://github.com/dvryaboy/pig and not sure if it is still active. Can someone please let me know the status of Spork or any other effort that will let us run Pig on Spark? We can significantly benefit by using Spark, but we would like to keep using the existing Pig scripts. -- ...:::Aniket:::... Quetzalco@tl
Re: Job aborted: Spark cluster looks down
Can you see your webUI of Spark. Is it running? (would run on masterurl:8080) if so what is the master URL shown thr.. MASTER=spark://URL:PORT ./bin/spark-shell Should work. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Thu, Mar 6, 2014 at 2:22 PM, Christian chri...@gmail.com wrote: Hello, has anyone found this problem before? I am sorry to insist but I can not guess what is happening. Should I ask to the dev mailing list? Many thanks in advance. El 05/03/2014 23:57, Christian chri...@gmail.com escribió: I have deployed a Spark cluster in standalone mode with 3 machines: node1/192.168.1.2 - master node2/192.168.1.3 - worker 20 cores 12g node3/192.168.1.4 - worker 20 cores 12g The web interface shows the workers correctly. When I launch the scala job (which only requires 256m of memory) these are the logs: 14/03/05 23:24:06 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 55 tasks 14/03/05 23:24:21 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/03/05 23:24:23 INFO client.AppClient$ClientActor: Connecting to master spark://node1:7077... 14/03/05 23:24:36 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/03/05 23:24:43 INFO client.AppClient$ClientActor: Connecting to master spark://node1:7077... 14/03/05 23:24:51 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory 14/03/05 23:25:03 ERROR client.AppClient$ClientActor: All masters are unresponsive! Giving up. 14/03/05 23:25:03 ERROR cluster.SparkDeploySchedulerBackend: Spark cluster looks dead, giving up. 14/03/05 23:25:03 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool 14/03/05 23:25:03 INFO scheduler.DAGScheduler: Failed to run saveAsNewAPIHadoopFile at CondelCalc.scala:146 Exception in thread main org.apache.spark.SparkException: Job aborted: Spark cluster looks down at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) ... The generated logs by the master and the 2 workers are attached, but I found something weird in the master logs: 14/03/05 23:37:43 INFO master.Master: Registering worker *node1:57297*with 20 cores, 12.0 GB RAM 14/03/05 23:37:43 INFO master.Master: Registering worker *node1:34188*with 20 cores, 12.0 GB RAM It reports that the two workers are node1:57297 and node1:34188 instead of node3 and node2 respectively. $ cat /etc/hosts ... 192.168.1.2 node1 192.168.1.3 node2 192.168.1.4 node3 ... $ nslookup node2 Server: 192.168.1.1 Address:192.168.1.1#53 Name: node2.cluster.local Address: 192.168.1.3 $ nslookup node3 Server: 192.168.1.1 Address:192.168.1.1#53 Name: node3.cluster.local Address: 192.168.1.4 $ ssh node1 ps aux | grep spark cperez 17023 1.4 0.1 4691944 154532 pts/3 Sl 23:37 0:15 /data/users/cperez/opt/jdk/bin/java -cp :/data/users/cperez/opt/spark-0.9.0-incubating-bin-hadoop2/conf:/data/users/cperez/opt/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar:/data/users/cperez/opt/hadoop-2.2.0/etc/hadoop -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip node1 --port 7077 --webui-port 8080 $ ssh node2 ps aux | grep spark cperez 17511 2.7 0.1 4625248 156304 ? Sl 23:37 0:07 /data/users/cperez/opt/jdk/bin/java -cp :/data/users/cperez/opt/spark-0.9.0-incubating-bin-hadoop2/conf:/data/users/cperez/opt/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar:/data/users/cperez/opt/hadoop-2.2.0/etc/hadoop -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://node1:7077 $ ssh node2 netstat -lptun | grep 17511 tcp0 0 :::8081 :::* LISTEN 17511/java tcp0 0 :::192.168.1.3:34188:::* LISTEN 17511/java $ ssh node3 ps aux | grep spark cperez7543 1.9 0.1 4625248 158600 ? Sl 23:37 0:09 /data/users/cperez/opt/jdk/bin/java -cp :/data/users/cperez/opt/spark-0.9.0-incubating-bin-hadoop2/conf:/data/users/cperez/opt/spark-0.9.0-incubating-bin-hadoop2/assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar:/data/users/cperez/opt/hadoop-2.2.0/etc/hadoop -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://node1:7077 $ ssh node3 netstat -lptun | grep
Re: NoSuchMethodError - Akka - Props
I see the same error. I am trying a standalone example integrated into a Play Framework v2.2.2 application. The error occurs when I try to create a Spark Streaming Context. Compilation succeeds, so I am guessing it has to do with the version of Akka getting picked up at runtime. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-Akka-Props-tp2191p2375.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchMethodError - Akka - Props
Are you launching your application using scala or java command? scala command bring in a version of Akka that we have found to cause conflicts with Spark's version for Akka. So its best to launch using Java. TD On Thu, Mar 6, 2014 at 3:45 PM, Deepak Nulu deepakn...@gmail.com wrote: I see the same error. I am trying a standalone example integrated into a Play Framework v2.2.2 application. The error occurs when I try to create a Spark Streaming Context. Compilation succeeds, so I am guessing it has to do with the version of Akka getting picked up at runtime. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-Akka-Props-tp2191p2375.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchMethodError - Akka - Props
I was just able to fix this in my environment. By looking at the repository/cache in my Play Framework installation, I was able to determine that spark-0.9.0-incubating uses Akka version 2.2.3. Similarly, looking at repository/local revealed that Play Framework 2.2.2 ships with Akka version 2.2.0. So I added the following to my play project dependencies: com.typesafe.akka %% akka-actor % 2.2.3, com.typesafe.akka %% akka-slf4j % 2.2.3, That fixed the runtime exception and my standalone spark program works fine now. -deepak -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-Akka-Props-tp2191p2377.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Python 2.7 + numpy break sortByKey()
The difference between your two jobs is that take() is optimized and only runs on the machine where you are using the shell, whereas sortByKey requires using many machines. It seems like maybe python didn't get upgraded correctly on one of the slaves. I would look in the /root/spark/work/ folder (find the most recent application log) on each slave and see which slave is logging the error message. On Wed, Mar 5, 2014 at 9:02 AM, Nicholas Chammas nicholas.cham...@gmail.com wrote: Devs? Is this an issue for you that deserves a ticket? On Sun, Mar 2, 2014 at 4:32 PM, Nicholas Chammas nicholas.cham...@gmail.com wrote: So this issue appears to be related to the other Python 2.7-related issue I reported in this thread. Shall I open a bug in JIRA about this and include the wikistat repro? Nick On Sun, Mar 2, 2014 at 1:50 AM, nicholas.chammas nicholas.cham...@gmail.com wrote: Unexpected behavior. Here's the repro: Launch an EC2 cluster with spark-ec2. 1 slave; default instance type. Upgrade the cluster to Python 2.7 using the instructions here. pip2.7 install numpy Run this script in the pyspark shell: wikistat = sc.textFile('s3n://ACCESSKEY:SECRET@bigdatademo/sample/wiki/pagecounts-20100212-05.gz') wikistat = wikistat.map(lambda x: x.split(' ')).cache() wikistat.map(lambda x: (x[1], int(x[3]))).map(lambda x: (x[1],x[0])).sortByKey(False).take(5) You will see a long error output that includes a complaint about NumPy not being installed. Now remove the sortByKey() from that last line and rerun it. wikistat.map(lambda x: (x[1], int(x[3]))).map(lambda x: (x[1],x[0])).take(5) You should see your results without issue. So it's the sortByKey() that's choking. Quit the pyspark shell and pip uninstall numpy. Rerun the three lines from step 4. Enjoy your sorted results error-free. Can anyone else reproduce this issue? Is it a bug? I don't see it if I leave the cluster on the default Python 2.6.8. Installing numpy on the slave via pssh and pip2.7 (so that it's identical to the master) does not fix the issue. Dunno if installing Python packages everywhere is even necessary though. Nick View this message in context: Python 2.7 + numpy break sortByKey() Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: NoSuchMethodError in KafkaReciever
I dont have a Eclipse setup so I am not sure what is going on here. I would try to use maven in the command line with a pom to see if this compiles. Also, try to cleanup your system maven cache. Who knows if it had pulled in a wrong version of kafka 0.8 and using it all the time. Blowing away the cache and clean compiling will make sure the right kafka will be loaded. Hope this helps. TD On Sat, Mar 1, 2014 at 8:26 PM, venki-kratos ve...@thekratos.com wrote: I am trying to user code similar to following : public JavaPairDStreamString, String openStream() { HashMapString, String kafkaParams = Maps.newHashMap(); kafkaParams.put(ZK_CONNECT,kafkaConfig.getString(ZK_CONNECT)); kafkaParams.put(CONSUMER_GRP_ID,kafkaConfig.getString(CONSUMER_GRP_ID)); MapString,Integer topicMap = Maps.newHashMap(); topicMap.put(kafkaConfig.getString(ZK_TOPIC), kafkaConfig.getInteger(CONSUMER_THREAD_COUNT, 1)); JavaPairDStreamString, String inputStream = KafkaUtils.createStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); return inputStream; } I have spark-streaming_2.10-0.9.0-incubating.jar and spark-streaming-kafka_2.10-0.9.0-incubating.jar in the classpath using POM and m2e in Eclipse. JVM version is set to 1.6 I get the following error, 14/03/02 09:29:15 INFO kafka.KafkaReceiver: Connected to localhost:2181 14/03/02 09:29:15 ERROR kafka.KafkaReceiver: Error receiving data java.lang.NoSuchMethodException: java.lang.Object.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2763) at java.lang.Class.getConstructor(Class.java:1693) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at . This is similar to code in JavaKafkaStreamSuite.testKafkaStream. I find that the kafka jar - kafka_2.10-0.8.0 does have such a constructor. What is going wrong? Can someone help solve this mystery and help with my misery? Basically stuck for last 2 days - as I am a Java Guy and would like to develop downstream code in Java -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: need someone to help clear some questions.
many thanks for guiding. 2014-03-06 23:39 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: Hi qingyang, 1. You do not need to install shark on every node. 2. Not really sure..it's just a warning so I'd see if it works despite it 3. You need to provide the actual hdfs path, e.g. hdfs://namenode/user2/vols.csv, see this thread https://groups.google.com/forum/#!topic/tachyon-users/3Da4zcHKBbY Lastly as your questions are more shark than spark related there is a separate shark user group that might be more helpful. Hope this helps On Thu, Mar 6, 2014 at 3:25 AM, qingyang li liqingyang1...@gmail.comwrote: just a addition for #3, i have such configuration in shark-env.sh: export HADOOP_HOME=/usr/lib/hadoop export HADOOP_CONF_DIR=/etc/hadoop/conf export HIVE_HOME=/usr/lib/hive/ #export HIVE_CONF_DIR=/etc/hive/conf export MASTER=spark://bigdata001:7077 - 2014-03-06 16:20 GMT+08:00 qingyang li liqingyang1...@gmail.com: hi, spark community, i have setup 3 nodes cluster using spark 0.9 and shark 0.9, My question is : 1. is there any neccessary to install shark on every node since it is a client to use spark service ? 2. when i run shark-withinfo, i got such warning: WARN shark.SharkEnv: Hive Hadoop shims detected local mode, but Shark is not running locally. WARN shark.SharkEnv: Setting mapred.job.tracker to 'Spark_1394093746930' (was 'local') what does this log want to tell us ? is it a problem to run shark? 3. i want to load data from hdfs , so i run LOAD DATA INPATH '/user/root/input/test.txt' into table b; , but i got this error:No files matching path file:/user/root/input/test.txt , but this file exists on hdfs. thanks.
Re: Job initialization performance of Spark standalone mode vs YARN
We're not using Ooyala's job server. We are holding the spark context for reuse within our own REST server (with a service to run each job). Our low-latency job now reads all its data from a memory cached RDD, instead of from HDFS seq file (upstream jobs cache resultant RDDs for downstream jobs to read). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-tp2016p2384.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Running actions in loops
Hello, What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files? For example: I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on. I have verified that the following (very naive approach doesn't work): def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = { val epd = new PipelineDate(end) val result = for { dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd) val f1 = sc.textFile(dt1.toJsonHdfsFileName) val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),0)).cache val c = e1.count.toDouble val intres = for { dt2 - PipelineDate.getPeriod(dt1+1,epd) val f2 = sc.textFile(dt2.toJsonHdfsFileName) val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)) val e1e2 = e1.union(e2) val r = e1e2.groupByKey().filter(e = e._2.length 1 e._2.filter(_==0).length0).count.toDouble } yield (c/r) // get the retention rate } yield (dt1.toString-intres) Map(result:_*) } I am getting the following errors: 14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33) 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I should mention that this code is fired off from an Akka actor (which is controlled by a Scalatra servlet). Any ideas, recommendations etc.? I am fairly new to Scala and M/R principles in general, it is fair to say that at this point I am still thinking from a point of view of an imperative programmer trying to fit a square peg through a round hole ;) Ognen
Re: Job initialization performance of Spark standalone mode vs YARN
Would you be the best person in the world share some code. Its a pretty common problem . On Mar 6, 2014 6:36 PM, polkosity polkos...@gmail.com wrote: We're not using Ooyala's job server. We are holding the spark context for reuse within our own REST server (with a service to run each job). Our low-latency job now reads all its data from a memory cached RDD, instead of from HDFS seq file (upstream jobs cache resultant RDDs for downstream jobs to read). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Job-initialization-performance-of-Spark-standalone-mode-vs-YARN-tp2016p2384.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: need someone to help clear some questions.
Hi, Yana, do you know if there is mailing list for shark like spark's? 2014-03-06 23:39 GMT+08:00 Yana Kadiyska yana.kadiy...@gmail.com: Hi qingyang, 1. You do not need to install shark on every node. 2. Not really sure..it's just a warning so I'd see if it works despite it 3. You need to provide the actual hdfs path, e.g. hdfs://namenode/user2/vols.csv, see this thread https://groups.google.com/forum/#!topic/tachyon-users/3Da4zcHKBbY Lastly as your questions are more shark than spark related there is a separate shark user group that might be more helpful. Hope this helps On Thu, Mar 6, 2014 at 3:25 AM, qingyang li liqingyang1...@gmail.comwrote: just a addition for #3, i have such configuration in shark-env.sh: export HADOOP_HOME=/usr/lib/hadoop export HADOOP_CONF_DIR=/etc/hadoop/conf export HIVE_HOME=/usr/lib/hive/ #export HIVE_CONF_DIR=/etc/hive/conf export MASTER=spark://bigdata001:7077 - 2014-03-06 16:20 GMT+08:00 qingyang li liqingyang1...@gmail.com: hi, spark community, i have setup 3 nodes cluster using spark 0.9 and shark 0.9, My question is : 1. is there any neccessary to install shark on every node since it is a client to use spark service ? 2. when i run shark-withinfo, i got such warning: WARN shark.SharkEnv: Hive Hadoop shims detected local mode, but Shark is not running locally. WARN shark.SharkEnv: Setting mapred.job.tracker to 'Spark_1394093746930' (was 'local') what does this log want to tell us ? is it a problem to run shark? 3. i want to load data from hdfs , so i run LOAD DATA INPATH '/user/root/input/test.txt' into table b; , but i got this error:No files matching path file:/user/root/input/test.txt , but this file exists on hdfs. thanks.
Re: NoSuchMethodError in KafkaReciever
Will give it a shot, later. BTW, this forced me to move to Scala! Decided to design our aggregation frame-work in scala for now. On 07-Mar-2014, at 6:02 AM, Tathagata Das tathagata.das1...@gmail.com wrote: I dont have a Eclipse setup so I am not sure what is going on here. I would try to use maven in the command line with a pom to see if this compiles. Also, try to cleanup your system maven cache. Who knows if it had pulled in a wrong version of kafka 0.8 and using it all the time. Blowing away the cache and clean compiling will make sure the right kafka will be loaded. Hope this helps. TD On Sat, Mar 1, 2014 at 8:26 PM, venki-kratos ve...@thekratos.com wrote: I am trying to user code similar to following : public JavaPairDStreamString, String openStream() { HashMapString, String kafkaParams = Maps.newHashMap(); kafkaParams.put(ZK_CONNECT,kafkaConfig.getString(ZK_CONNECT)); kafkaParams.put(CONSUMER_GRP_ID,kafkaConfig.getString(CONSUMER_GRP_ID)); MapString,Integer topicMap = Maps.newHashMap(); topicMap.put(kafkaConfig.getString(ZK_TOPIC), kafkaConfig.getInteger(CONSUMER_THREAD_COUNT, 1)); JavaPairDStreamString, String inputStream = KafkaUtils.createStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); return inputStream; } I have spark-streaming_2.10-0.9.0-incubating.jar and spark-streaming-kafka_2.10-0.9.0-incubating.jar in the classpath using POM and m2e in Eclipse. JVM version is set to 1.6 I get the following error, 14/03/02 09:29:15 INFO kafka.KafkaReceiver: Connected to localhost:2181 14/03/02 09:29:15 ERROR kafka.KafkaReceiver: Error receiving data java.lang.NoSuchMethodException: java.lang.Object.init(kafka.utils.VerifiableProperties) at java.lang.Class.getConstructor0(Class.java:2763) at java.lang.Class.getConstructor(Class.java:1693) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:108) at org.apache.spark.streaming.dstream.NetworkReceiver.start(NetworkInputDStream.scala:126) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:173) at org.apache.spark.streaming.scheduler.NetworkInputTracker$ReceiverExecutor$$anonfun$8.apply(NetworkInputTracker.scala:169) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) at org.apache.spark.scheduler.Task.run(Task.scala:53) at . This is similar to code in JavaKafkaStreamSuite.testKafkaStream. I find that the kafka jar - kafka_2.10-0.8.0 does have such a constructor. What is going wrong? Can someone help solve this mystery and help with my misery? Basically stuck for last 2 days - as I am a Java Guy and would like to develop downstream code in Java -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-in-KafkaReciever-tp2209.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Running actions in loops
It looks like the problem is in the filter task - is there anything special about filter()? I have removed the filter line from the loops just to see if things will work and they do. Anyone has any ideas? Thanks! Ognen On 3/6/14, 9:39 PM, Ognen Duzlevski wrote: Hello, What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files? For example: I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events. I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on. I have verified that the following (very naive approach doesn't work): def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = { val epd = new PipelineDate(end) val result = for { dt1 - PipelineDate.getPeriod(new PipelineDate(start), epd) val f1 = sc.textFile(dt1.toJsonHdfsFileName) val e1 = f1.filter(_.split(,)(0).split(:)(1).replace(\,) == event1).map(line = (line.split(,)(2).split(:)(1).replace(\,),0)).cache val c = e1.count.toDouble val intres = for { dt2 - PipelineDate.getPeriod(dt1+1,epd) val f2 = sc.textFile(dt2.toJsonHdfsFileName) val e2 = f2.filter(_.split(,)(0).split(:)(1).replace(\,) == event2).map(line = (line.split(,)(2).split(:)(1).replace(\,),1)) val e1e2 = e1.union(e2) val r = e1e2.groupByKey().filter(e = e._2.length 1 e._2.filter(_==0).length0).count.toDouble } yield (c/r) // get the retention rate } yield (dt1.toString-intres) Map(result:_*) } I am getting the following errors: 14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false) 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33) 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List() 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List() 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I should mention that this code is fired off from an Akka actor (which is controlled by a Scalatra servlet). Any ideas, recommendations etc.? I am fairly new to Scala and M/R principles in general, it is fair to say that at this point I am still thinking from a point of view of an imperative programmer trying to
Please remove me from the mail list.//Re: NoSuchMethodError - Akka - Props
Please remove me from the mail list. -邮件原件- 发件人: Deepak Nulu [mailto:deepakn...@gmail.com] 发送时间: 2014年3月7日 7:45 收件人: u...@spark.incubator.apache.org 主题: Re: NoSuchMethodError - Akka - Props I see the same error. I am trying a standalone example integrated into a Play Framework v2.2.2 application. The error occurs when I try to create a Spark Streaming Context. Compilation succeeds, so I am guessing it has to do with the version of Akka getting picked up at runtime. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-Akka-Props-tp2191p2375.html Sent from the Apache Spark User List mailing list archive at Nabble.com.