Solving Systems of Linear Equations Using Spark?
Doing a quick Google search, it appears to me that there is a number people who have implemented algorithms for solving systems of (sparse) linear equations on Hadoop MapReduce. However, I can find no such thing for Spark. Has anyone information on whether there are attempts of creating such an algorithm for Spark? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solving-Systems-of-Linear-Equations-Using-Spark-tp13674.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
Hi, I'm using a cluster with 5 nodes that each use 8 cores and 10GB of RAM Basically I'm creating a dictionary from text, i.e. giving each words that occurs more than n times in all texts a unique identifier. The essential port of the code looks like that: var texts = ctx.sql(SELECT text FROM table LIMIT 1500).map(_.head.toString).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) var dict2 = texts.flatMap(_.split( ).map(_.toLowerCase())).repartition(80) dict2 = dict2.filter(s = s.startsWith(http) == false) dict2 = dict2.filter(s = s.startsWith(@) == false) dict2 = dict2.map(removePunctuation(_)) //removes .,?!:; in strings (single words) dict2 = dict2.groupBy(identity).filter(_._2.size 10).keys //only keep entries that occur more than n times. var dict3 = dict2.zipWithIndex var dictM = dict3.collect.toMap var count = dictM.size If I use only 10M texts, it works. With 15M texts as above I get the following error. It occurs after the dictM.size operation, but due to laziness there isn't any computing happening before that. 14/08/27 22:36:29 INFO scheduler.TaskSchedulerImpl: Adding task set 3.0 with 1 tasks 14/08/27 22:36:29 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 3.0 (TID 2028, idp11.foo.bar, PROCESS_LOCAL, 921 bytes) 14/08/27 22:36:29 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on idp11.foo.bar:36295 (size: 9.4 KB, free: 10.4 GB) 14/08/27 22:36:30 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 2 to sp...@idp11.foo.bar:33925 14/08/27 22:36:30 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 1263 bytes 14/08/27 22:37:06 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 2028, idp11.foo.bar): java.lang.OutOfMemoryError: Requested array size exceeds VM limit java.util.Arrays.copyOf(Arrays.java:3230) java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) ... I'm fine with spilling to disk if my program runs out of memory, but is there anything to prevent this error without changing Java Memory settings? (assume those are at the physical maximum) Kind regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-Requested-array-size-exceeds-VM-limit-tp12993.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Only master is really busy at KMeans training
With a lower number of partitions, I keep losing executors during collect at KMeans.scala:283 The error message is ExecutorLostFailure (executor lost). The program recovers by automatically repartitioning the whole dataset (126G), which takes very long and seems to only delay the inevitable failure. Is there a recommended solution to this issue? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411p12803.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Only master is really busy at KMeans training
Right now, I have issues even at a far earlier point. I'm fetching data from a registerd table via var texts = ctx.sql(SELECT text FROM tweetTrainTable LIMIT 2000).map(_.head.toString).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) //persisted because it's used again later var dict = texts.flatMap(_.split( ).map(_.toLowerCase())).repartition(80) //80=2*num_cpu var count = dict.count.toInt As far as I can see, it's the repartitioning that is causingthe problems. However, without that, I have only one partition for further RDD operations on dict, so it seems to be necessary. The errors given are 14/08/26 10:43:52 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.1 (TID 2300, idp11.foo.bar): java.lang.OutOfMemoryError: Requested array size exceeds VM limit java.util.Arrays.copyOf(Arrays.java:3230) java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113) java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93) java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140) ... Then the RDD operations start again, but later I will get 14/08/26 10:47:14 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.2 (TID 2655, idp41.foo.bar: java.lang.NullPointerException: $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:26) $line39.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:26) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) and another java.lang.OutOfMemoryError. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411p12842.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Only master is really busy at KMeans training
When trying to use KMeans.train with some large data and 5 worker nodes, it would due to BlockManagers shutting down because of timeout. I was able to prevent that by adding spark.storage.blockManagerSlaveTimeoutMs 300 to the spark-defaults.conf. However, with 1 Million feature vectors, the Stage takeSample at KMeans.scala:263 runs for about 50 minutes. In this time, about half of the tasks are done, then I lose the executors and Spark starts a new repartitioning stage. I also noticed that in the takeSample stage, the task was running for about 2.5 minutes until suddenly it is finished and duration (prev. those 2.5min) change to 2s, with 0.9s GC time. The training data is supplied in this form: var vectors2 = vectors.repartition(1000).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) var broadcastVector = sc.broadcast(vectors2) The 1000 partitions is something that could probably be optimized, but too few will cause OOM erros. Using Ganglia, I can see that the master node is the only one that is properly busy regarding CPU, and that most (600-700 of 800 total percent CPU) is used by the master. The workers on each node only use 1 Core, i.e. 100% CPU. What would be the most likely cause for such an inefficient use of the cluster, and how to prevent it? Number of partitions, way of caching, ...? I'm trying to find out myself with tests, but ideas from someone with more experience are very welcome. Best regards, simn -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Only-master-is-really-busy-at-KMeans-training-tp12411.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark webUI - application details page
If I don't understand you wrong, setting event logging in the SPARK_JAVA_OPTS should achieve what you want. I'm logging to the HDFS, but according to the config page http://spark.apache.org/docs/latest/configuration.html a folder should be possible as well. Example with all other settings removed: SPARK_JAVA_OPTS=-Dspark.eventLog.enabled=true -Dspark.eventLog.dir=hdfs://idp11:9100/user/myname/logs/ This works with the Spark shell, I haven't tested other applications though. Note that the completed applications will disappear from the list if you restart Spark completely, even though they'll still be stored in the log folder. Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-webUI-application-details-page-tp3490p12150.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Using very large files for KMeans training -- cluster centers size?
I'm trying to apply KMeans training to some text data, which consists of lines that each contain something between 3 and 20 words. For that purpose, all unique words are saved in a dictionary. This dictionary can become very large as no hashing etc. is done, but it should spill to disk in case it doesn't fit into memory anymore: var dict = scala.collection.mutable.Map[String,Int]() dict.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) With the help of this dictionary, I build sparse feature vectors for each line which are then saved in an RDD that is used as input for KMeans.train. Spark is running in standalone mode, in this case with 5 worker nodes. It appears that anything up to the actual training completes successfully with 126G of training data (logs below). The training data is provided in form a cached, broadcasted variable to all worker nodes: var vectors2 = vectors.repartition(1000).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) var broadcastVector = sc.broadcast(vectors2) println(-Start model training-); var model = KMeans.train(broadcastVector.value, 20, 10) The first error I get is a null pointer exception, but there is still work done after that. I think the real reason this terminates is java.lang.OutOfMemoryError: Java heap space. Is it possible that this happens because the cluster centers in the model are represented in dense instead of sparse form, thereby getting large with a large vector size? If yes, how can I make sure it doesn't crash because of that? It should spill to disk if necessary. My goal would be to have the input size only limited by disk space. Sure it would get very slow if it spills to disk all the time, but it shouldn't terminate. Here's the console output from the model.train part: -Start model training- 14/08/11 17:05:17 INFO spark.SparkContext: Starting job: takeSample at KMeans.scala:263 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Registering RDD 64 (repartition at console:48) 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Got job 6 (takeSample at KMeans.scala:263) with 1000 output partitions (allowLocal=false) 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Final stage: Stage 8(takeSample at KMeans.scala:263) 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 9) 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Missing parents: List(Stage 9) 14/08/11 17:05:17 INFO scheduler.DAGScheduler: Submitting Stage 9 (MapPartitionsRDD[64] at repartition at console:48), which has no missing parents 4116.323: [GC (Allocation Failure) [PSYoungGen: 1867168K-240876K(2461696K)] 4385155K-3164592K(9452544K), 1.4455064 secs] [Times: user=11.33 sys=0.03, real=1.44 secs] 4174.512: [GC (Allocation Failure) [PSYoungGen: 1679497K-763168K(2338816K)] 4603212K-3691609K(9329664K), 0.8050508 secs] [Times: user=6.04 sys=0.01, real=0.80 secs] 4188.250: [GC (Allocation Failure) [PSYoungGen: 2071822K-986136K(2383360K)] 5000263K-4487601K(9374208K), 1.6795174 secs] [Times: user=13.23 sys=0.01, real=1.68 secs] 14/08/11 17:06:57 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 9 (MapPartitionsRDD[64] at repartition at console:48) 14/08/11 17:06:57 INFO scheduler.TaskSchedulerImpl: Adding task set 9.0 with 1 tasks 4190.947: [GC (Allocation Failure) [PSYoungGen: 2336718K-918720K(2276864K)] 5838183K-5406145K(9267712K), 1.5793066 secs] [Times: user=12.40 sys=0.02, real=1.58 secs] 14/08/11 17:07:00 WARN scheduler.TaskSetManager: Stage 9 contains a task of very large size (272484 KB). The maximum recommended task size is 100 KB. 14/08/11 17:07:00 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 9.0 (TID 3053, idp11.foo.bar, PROCESS_LOCAL, 279023993 bytes) 4193.607: [GC (Allocation Failure) [PSYoungGen: 2070046K-599908K(2330112K)] 6557472K-5393557K(9320960K), 0.3267949 secs] [Times: user=2.53 sys=0.01, real=0.33 secs] 4194.645: [GC (Allocation Failure) [PSYoungGen: 1516770K-589655K(2330112K)] 6310419K-5383352K(9320960K), 0.2566507 secs] [Times: user=1.96 sys=0.00, real=0.26 secs] 4195.815: [GC (Allocation Failure) [PSYoungGen: 1730909K-275312K(2330112K)] 6524606K-5342865K(9320960K), 0.2053884 secs] [Times: user=1.57 sys=0.00, real=0.21 secs] 14/08/11 17:08:56 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on idp11.foo.bar:46418 (size: 136.0 B, free: 10.4 GB) 14/08/11 17:08:56 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 1 to sp...@idp11.foo.bar:57072 14/08/11 17:10:09 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 9.0 (TID 3053, idp11.foo.bar): java.lang.NullPointerException: $line86.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36) $line86.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(console:36)
Re: saveAsTextFile
This should work: jobs.saveAsTextFile(file:home/hysom/testing) Note the 4 slashes, it's really 3 slashes + absolute path. This should be mentioned in the docu though, I only remember that from having seen it somewhere else. The output folder, here testing, will be created and must therefore not exist before. Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/saveAsTextFile-tp11803p11846.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Executors for Spark shell take much longer to be ready
I recently moved my Spark installation from one Linux user to another one, i.e. changed the folder and ownership of the files. That was everything, no other settings were changed or different machines used. However, now it suddenly takes three minutes to have all executors in the Spark shell ready, instead of about 10 seconds as it was before. They'll be listed in the Application: Spark shell page, but not in Application Detail UI - Executors-Tab. Only the master driver and executor are there. Can anyone think of an explanation for this? Here's a log of the startup, you can see that the delay is exactly three minutes: spark.logConf=true spark.master=spark://idp19:1 spark.repl.class.uri=http://172.16.0.19:46457 spark.speculation=true 14/08/08 14:54:29 INFO spark.SecurityManager: Changing view acls to: myuser 14/08/08 14:54:29 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(myuser) 4.400: [GC (Metadata GC Threshold) [PSYoungGen: 188763K-30233K(1223168K)] 248423K-89902K(4019712K), 0.0468201 secs] [Times: user=0.26 sys=0.04, real=0.05 secs] 4.447: [Full GC (Metadata GC Threshold) [PSYoungGen: 30233K-0K(1223168K)] [ParOldGen: 59668K-77437K(2796544K)] 89902K-77437K(4019712K), [Metaspace: 34757K-34757K(1079296K)], 0.1397987 secs] [Times: user=0.77 sys=0.02, real=0.14 secs] 14/08/08 14:54:30 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/08/08 14:54:30 INFO Remoting: Starting remoting 14/08/08 14:54:30 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@idp19.foo.bar:55806] 14/08/08 14:54:30 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@idp19.foo.bar:55806] 14/08/08 14:54:30 INFO spark.SparkEnv: Registering MapOutputTracker 14/08/08 14:54:30 INFO spark.SparkEnv: Registering BlockManagerMaster 14/08/08 14:54:30 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140808145430-8ede 14/08/08 14:54:30 INFO network.ConnectionManager: Bound socket to port 51497 with id = ConnectionManagerId(idp19.foo.bar,51497) 14/08/08 14:54:30 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB 14/08/08 14:54:30 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/08/08 14:54:30 INFO storage.BlockManagerMasterActor: Registering block manager idp19.foo.bar:51497 with 2.1 GB RAM 14/08/08 14:54:30 INFO storage.BlockManagerMaster: Registered BlockManager 14/08/08 14:54:30 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-76fec37d-3812-4034-94e7-0456a4bc76dc 14/08/08 14:54:30 INFO spark.HttpServer: Starting HTTP Server 14/08/08 14:54:30 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/08/08 14:54:30 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:56276 14/08/08 14:54:30 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/08/08 14:54:30 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/08/08 14:54:30 INFO ui.SparkUI: Started SparkUI at http://idp19.foo.bar:4040 14/08/08 14:54:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/08/08 14:54:31 WARN hdfs.DomainSocketFactory: The short-circuit local reads feature is disabled because libhadoop cannot be loaded. 14/08/08 14:54:31 INFO scheduler.EventLoggingListener: Logging events to hdfs://idp11:9100/user/myuser/logs/spark-shell-1407534870777 14/08/08 14:54:31 INFO scheduler.TaskSchedulerImpl: Starting speculative execution thread 14/08/08 14:54:31 INFO client.AppClient$ClientActor: Connecting to master spark://idp19:1... 14/08/08 14:54:31 INFO repl.SparkILoop: Created spark context.. Spark context available as sc. scala 14/08/08 14:54:31 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140808145431-0001 14/08/08 14:54:31 INFO client.AppClient$ClientActor: Executor added: app-20140808145431-0001/0 on worker-20140808145140-idp09.foo.bar-11000 (idp09.foo.bar:11000) with 8 cores 14/08/08 14:54:31 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140808145431-0001/0 on hostPort idp09.foo.bar:11000 with 8 cores, 25.0 GB RAM 14/08/08 14:54:31 INFO client.AppClient$ClientActor: Executor added: app-20140808145431-0001/1 on worker-20140808145427-idp42.foo.bar-11000 (idp42.foo.bar:11000) with 8 cores 14/08/08 14:54:31 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140808145431-0001/1 on hostPort idp42.foo.bar:11000 with 8 cores, 25.0 GB RAM 14/08/08 14:54:31 INFO client.AppClient$ClientActor: Executor added: app-20140808145431-0001/2 on worker-20140808145049-idp41.foo.bar-11000 (idp41.foo.bar:11000) with 8 cores 14/08/08 14:54:31 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20140808145431-0001/2 on hostPort idp41.foo.bar:11000 with 8 cores, 25.0 GB RAM 14/08/08 14:54:31 INFO client.AppClient$ClientActor: Executor added: app-20140808145431-0001/3 on worker-20140808144952-idp19.foo.bar-11000
Re: KMeans Input Format
Not all memory can be used for Java heap space, so maybe it does run out. Could you try repartitioning the data? To my knowledge you shouldn't be thrown out as long as a single partition fits into memory, even if the whole dataset does not. To do that, exchange val train = parsedData.cache() with val train = parsedData.repartition(20).cache() Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-Input-Format-tp11654p11719.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming fails - where is the problem?
Update: I can get it to work by disabling iptables temporarily. I can, however, not figure out on which port I have to accept traffic. 4040 and any of the Master or Worker ports mentioned in the previous post don't work. Can it be one of the randomly assigned ones in the 30k to 60k range? Those appear to change every time, making it difficult to apply any sensible rules. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming fails - where is the problem?
Hi Andrew, for this test I only have one machine which provides the master and only worker. So all I'd need is communication to the Internet to access the twitter API. I've tried assigning a specific port to the driver and creating iptables rules for this port, but that didn't work. Best regards, Simon On Aug 6, 2014 11:37 AM, quot;Andrew Or-2 [via Apache Spark User List]quot; lt;ml-node+s1001560n11561...@n3.nabble.comgt; wrote: Hi Simon, The drivers and executors currently choose random ports to talk to each other, so the Spark nodes will have to have full TCP access to each other. This is changed in a very recent commit, where all of these random ports will become configurable: https://github.com/apache/spark/commit/09f7e4587bbdf74207d2629e8c1314f93d865999 . This will be available in Spark 1.1, but for now you will have to open all ports among the nodes in your cluster. -Andrew 2014-08-06 10:23 GMT-07:00 durin lt; [hidden email] gt;: lt;blockquote style='border-left:2px solid #CC;padding:0 1em' class=quot;gmail_quotequot; style=quot;margin:0 0 0 .8ex;border-left:1px #ccc solid;padding-left:1exquot;gt;Update: I can get it to work by disabling iptables temporarily. I can, however, not figure out on which port I have to accept traffic. 4040 and any of the Master or Worker ports mentioned in the previous post don#39;t work. Can it be one of the randomly assigned ones in the 30k to 60k range? Those appear to change every time, making it difficult to apply any sensible rules. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11556.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: [hidden email] For additional commands, e-mail: [hidden email] If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11561.html To unsubscribe from Spark Streaming fails - where is the problem?, click here . NAML -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11566.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Spark Streaming fails - where is the problem?
I am using the latest Spark master and additionally, I am loading these jars: - spark-streaming-twitter_2.10-1.1.0-SNAPSHOT.jar - twitter4j-core-4.0.2.jar - twitter4j-stream-4.0.2.jar My simple test program that I execute in the shell looks as follows: import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.streaming.StreamingContext._ System.setProperty(twitter4j.oauth.consumerKey, jXgXF...) System.setProperty(twitter4j.oauth.consumerSecret, mWPvQRl1) System.setProperty(twitter4j.oauth.accessToken, 26176) System.setProperty(twitter4j.oauth.accessTokenSecret, J8Fcosm4...) var ssc = new StreamingContext(sc, Seconds(1)) var tweets = TwitterUtils.createStream(ssc, None) var statuses = tweets.map(_.getText) statuses.print() ssc.start() However, I won't get any tweets. The main error I see is 14/08/04 10:52:35 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.NoSuchMethodError: twitter4j.TwitterStream.addListener(Ltwitter4j/StatusListener;)V at org.apache.spark.streaming.twitter.TwitterReceiver.onStart(TwitterInputDStream.scala:72) And then for each iteration: INFO scheduler.ReceiverTracker: Stream 0 received 0 blocks I'm not sure where the problem lies. How can I verify that my twitter credentials are correctly recognized? Might there be another jar missing? Kindly, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming fails - where is the problem?
Using 3.0.3 (downloaded from http://mvnrepository.com/artifact/org.twitter4j ) changes the error to Exception in thread Thread-55 java.lang.NoClassDefFoundError: twitter4j/StatusListener at org.apache.spark.streaming.twitter.TwitterInputDStream.getReceiver(TwitterInputDStream.scala:55) It seems yet another version is required. Is there any quick way to find out which? The ScalaDoc for TwitterUtils doesn't seem to mention anything in that direction. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11387.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming fails - where is the problem?
In the WebUI Environment tab, the section Classpath Entries lists the following ones as part of System Classpath: /foo/hadoop-2.0.0-cdh4.5.0/etc/hadoop /foo/spark-master-2014-07-28/assembly/target/scala-2.10/spark-assembly-1.1.0-SNAPSHOT-hadoop2.0.0-cdh4.5.0.jar /foo/spark-master-2014-07-28/conf /foo/spark-master-2014-07-28/external/twitter/target/spark-streaming-twitter_2.10-1.1.0-SNAPSHOT.jar /foo/spark-master-2014-07-28/extrajars/twitter4j-core-3.0.3.jar /foo/spark-master-2014-07-28/extrajars/twitter4j-stream-3.0.3.jar So I can't see where any other versions would come from. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fails-where-is-the-problem-tp11355p11391.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: sbt package failed: wrong libraryDependencies for spark-streaming?
Hi Tathagata, I didn't mean to say this was an error. According to the other thread I linked, right now there shouldn't be any conflicts, so I wanted to use streaming in the shell for easy testing. I thought I had to create my own project in which I'd add streaming as a dependency, but if I can add it into the config that' even simpler and gets rid of my sbt problem. I'll try that. Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/sbt-package-failed-wrong-libraryDependencies-for-spark-streaming-tp11103p11106.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KMeans: expensiveness of large vectors
Development is really rapid here, that's a great thing. Out of curiosity, how did communication work before torrent? Did everything have to go back to the master / driver first? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10870.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KMeans: expensiveness of large vectors
Hi Xiangru, thanks for the explanation. 1. You said we have to broadcast m * k centers (with m = number of rows). I thought there were only k centers at each time, which would the have size of n * k and needed to be broadcasted. Is that I typo or did I understand something wrong? And the collection of the average is partition-wise. So more partitions = more overhead, but basically same number of operations? 2. I have 5 executors with 8 CPU cores and 25G of memory each, and I usually split the input RDD into 80 partitions for a few Gigs of input data. Is there a rule of thumb for the number of partitions in relation to the input size? 3. Assuming I wouldn't use numeric data but instead converted text data into a numeric representation using a dictionary and a featurization function: The number of columns would be the number of entries in my dictionary (i.e. number of distinct words in my case). I'd use a sparse vector representation of course. But even so, if I have a few hundred thousand entries and therefore columns, broadcasting overhead will get very large, as the centers are still in a dense representation. Do you know of any way to improve performance then? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10804.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KMeans: expensiveness of large vectors
Hi Xiangrui, using the current master meant a huge improvement for my task. Something that did not even finish before (training with 120G of dense data) now completes in a reasonable time. I guess using torrent helps a lot in this case. Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10833.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
KMeans: expensiveness of large vectors
As a source, I have a textfile with n rows that each contain m comma-separated integers. Each row is then converted into a feature vector with m features each. I've noticed, that given the same total filesize and number of features, a larger number of columns is much more expensive for training a KMeans model than a large number of rows. To give an example: 10k rows X 1k columns took 21 seconds on my cluster, whereas 1k rows X 10k colums took 1min47s. Both files had a size of 238M. Can someone explain what in the implementation of KMeans causes large vectors to be so much more expensive than having many of these vectors? A pointer to the exact part of the source would be fantastic, but even a general explanation would help me. Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
import org.apache.spark.streaming.twitter._ in Shell
I'm using spark 1.0.0 (three weeks old build of latest). Along the lines of this tutorial http://ampcamp.berkeley.edu/big-data-mini-course/realtime-processing-with-spark-streaming.html , I want to read some tweets from twitter. When trying to execute in the Spark-Shell, I get The tutorial builds an app via sbt/sbt. Are there any special requirements for importing the TwitterUtils in the shell? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: import org.apache.spark.streaming.twitter._ in Shell
Thanks. Can I see that a Class is not available in the shell somewhere in the API Docs or do I have to find out by trial and error? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/import-org-apache-spark-streaming-twitter-in-Shell-tp9665p9678.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: KMeans for large training data
Thanks, setting the number of partitions to the number of executors helped a lot and training with 20k entries got a lot faster. However, when I tried training with 1M entries, after about 45 minutes of calculations, I get this: It's stuck at this point. The CPU load for the master is at 100% (so 1 of 8 cores is used), but the WebUI shows no active task, and after 30 more minutes of no visible change I had to leave for an appointment. I've never seen an error referring to this library before. Could that be due to the new partitioning? Edit: Just before sending, in a new test I realized this error also appears when the amount of testdata is very low (here 500 items). This time it includes a Java stacktrace though, instead of just stopping: So, to sum it up, KMeans.train works somewhere inbetween 10k and 200k items, but not outside this range. Can you think of an explanation for this behavior? Best regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407p9508.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
KMeans for large training data
Hi, I'm trying to use org.apache.spark.mllib.clustering.KMeans to do some basic clustering with Strings. My code works great when I use a five-figure amount of training elements. However, with for example 2 million elements, it gets extremely slow. A single stage may take up to 30 minutes. From the Web UI, I can see that it does these three things repeatedly: All of these tasks only use one executor, and on that executor only one core. And I can see a scheduler delay of about 25 seconds. I tried to use broadcast variables to speed this up, but maybe I'm using it wrong. The relevant code (where it gets slow) is this: What could I do to use more executors, and generally speed this up? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-for-large-training-data-tp9407.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
LIMIT with offset in SQL queries
Hi, in many SQL-DBMS like MySQL, you can set an offset for the LIMIT clause, s.t. /LIMIT 5, 10/ will return 10 rows, starting from row 5. As far as I can see, this is not possible in Spark-SQL. The best solution I have to imitate that (using Scala) is converting the RDD into an Array via collect() and then using a for-loop to return certain elements from that Array. Is there a better solution regarding performance and are there plans to implement an offset for LIMIT? Kind regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/LIMIT-with-offset-in-SQL-queries-tp8673.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
jsonFile function in SQLContext does not work
I'm using Spark 1.0.0-SNAPSHOT (downloaded and compiled on 2014/06/23). I'm trying to execute the following code: import org.apache.spark.SparkContext._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val table = sqlContext.jsonFile(hdfs://host:9100/user/myuser/data.json) table.printSchema() data.json looks like this (3 shortened lines shown here): {field1:content,id:12312213,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} {field1:content,id:56756765,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} {field1:content,id:56765765,read:false,user:{id:121212,name:E. Stark,num_heads:0},place:Winterfell,entities:{weapons:[],friends:[{name:R. Baratheon,id:23234,indices:[0,16]}]},lang:en} The JSON-Object in each line is valid according to the JSON-Validator I use, and as jsonFile is defined as def jsonFile(path: String): SchemaRDD Loads a JSON file (one object per line), returning the result as a SchemaRDD. I would assume this should work. However, executing this code return this error: 14/06/25 10:05:09 WARN scheduler.TaskSetManager: Lost TID 11 (task 0.0:11) 14/06/25 10:05:09 WARN scheduler.TaskSetManager: Loss was due to com.fasterxml.jackson.databind.JsonMappingException com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@238df2e4; line: 1, column: 1] at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) ... Does anyone know where the problem lies? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonFile function in SQLContext does not work
Hi Zongheng Yang, thanks for your response. Reading your answer, I did some more tests and realized that analyzing very small parts of the dataset (which is ~130GB in ~4.3M lines) works fine. The error occurs when I analyze larger parts. Using 5% of the whole data, the error is the same as posted before for certain TIDs. However, I get the structure determined so far as a result when using 5%. The Spark WebUI shows the following: Job aborted due to stage failure: Task 6.0:11 failed 4 times, most recent failure: Exception failure in TID 108 on host foo.bar.com: com.fasterxml.jackson.databind.JsonMappingException: No content to map due to end-of-input at [Source: java.io.StringReader@3697781f; line: 1, column: 1] com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:164) com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3029) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:261) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$class.foreach(Iterator.scala:727) scala.collection.AbstractIterator.foreach(Iterator.scala:1157) scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:823) org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:821) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.SparkContext$$anonfun$24.apply(SparkContext.scala:1132) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:112) org.apache.spark.scheduler.Task.run(Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) java.lang.Thread.run(Thread.java:662) Driver stacktrace: Is the only possible reason that some of these 4.3 Million JSON-Objects are not valid JSON, or could there be another explanation? And if it is the reason, is there some way to tell the function to just skip faulty lines? Thanks, Durin -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8278.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: jsonFile function in SQLContext does not work
Hi Yin an Aaron, thanks for your help, this was indeed the problem. I've counted 1233 blank lines using grep, and the code snippet below works with those. From what you said, I guess that skipping faulty lines will be possible in later versions? Kind regards, Simon -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/jsonFile-function-in-SQLContext-does-not-work-tp8273p8293.html Sent from the Apache Spark User List mailing list archive at Nabble.com.