spark spark-ec2 credentials using aws_security_token
Hi, I would like to ask if it is currently possible to use spark-ec2 script together with credentials that are consisting not only from: aws_access_key_id and aws_secret_access_key, but it also contains aws_security_token. When I try to run the script I am getting following error message: ERROR:boto:Caught exception reading instance data Traceback (most recent call last): File /Users/zikes/opensource/spark/ec2/lib/boto-2.34.0/boto/utils.py, line 210, in retry_url r = opener.open(req, timeout=timeout) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 404, in open response = self._open(req, data) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 422, in _open '_open', req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 382, in _call_chain result = func(*args) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1214, in http_open return self.do_open(httplib.HTTPConnection, req) File /System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py, line 1184, in do_open raise URLError(err) URLError: urlopen error [Errno 64] Host is down ERROR:boto:Unable to read instance data, giving up No handler was ready to authenticate. 1 handlers were checked. ['QuerySignatureV2AuthHandler'] Check your credentials Does anyone has some idea what can be possibly wrong? Is aws_security_token the problem? I know that it seems more like a boto problem, but still I would like to ask if anybody has some experience with this? My launch command is: ./spark-ec2 -k my_key -i my_key.pem --additional-tags mytag:tag1,mytag2:tag2 --instance-profile-name profile1 -s 1 launch test Thank you in advance for any help. Best regards, Jan Note: I have also asked at http://stackoverflow.com/questions/31583513/spark-spark-ec2-credentials-using-aws-security-token?noredirect=1#comment51151822_31583513 without any success. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Where can I find logs from workers PySpark
Hi, I am trying to do some logging in my PySpark jobs, particularly in map that is performed on workers. Unfortunately I am not able tofind these logs. Based on the documentation it seems that the logs should be on masters in the SPARK_KOME, directory work http://spark.apache.org/docs/latest/spark-standalone.html#monitoring-and-logging . But unfortunately I am can't see this directory. So I would like to ask, how do you do logging on workers? Is it somehow possible to send this logs also to master, for example after the job fininshes? Than you in advance for any suggestions and advices. Best regards, Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on YARN, ExecutorLostFailure for long running computations in map
So it seems that this problem was related to http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html and increasing the executor memory worked for me. __ Hi, I am getting ExecutorLostFailure when I run spark on YARN and in map I perform very long tasks (couple of hours). Error Log is below. Do you know if it is possible to set something to make it possible for Spark to perform these very long running jobs in map? Thank you very much for any advice. Best regards, Jan Spark log: 4533,931: [GC 394578K-20882K(1472000K), 0,0226470 secs] Traceback (most recent call last): File /home/hadoop/spark_stuff/spark_lda.py, line 112, in module models.saveAsTextFile(sys.argv[1]) File /home/hadoop/spark/python/pyspark/rdd.py, line 1324, in saveAsTextFile keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o36.saveAsTextFile. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 0.0 failed 4 times, most recent failure: Lost task 28.3 in stage 0.0 (TID 41, ip-172-16-1-90.us-west-2.compute.internal): ExecutorLostFailure (executor lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Yarn log: 14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-16-1-152.us-west-2.compute.internal:41091 (size: 596.9 KB, free: 775.7 MB) 14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-16-1-152.us-west-2.compute.internal:39160 (size: 596.9 KB, free: 775.7 MB) 14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-16-1-152.us-west-2.compute.internal:45058 (size: 596.9 KB, free: 775.7 MB) 14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-16-1-241.us-west-2.compute.internal:54111 (size: 596.9 KB, free: 775.7 MB) 14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-16-1-238.us-west-2.compute.internal:45772 (size: 596.9 KB, free: 775.7 MB) 14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-16-1-241.us-west-2.compute.internal:59509 (size: 596.9 KB, free: 775.7 MB) 14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-16-1-238.us-west-2.compute.internal:35720 (size: 596.9 KB, free: 775.7 MB) 14/11/08 08:21:11 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-16-1-241.us-west-2.compute.internal,59509) 14/11/08 08:21:11 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-16-1-241.us-west-2.compute.internal,59509) 14/11/08 08:21:11 ERROR network.ConnectionManager: Corresponding SendingConnection to
Re: Spark on Yarn probably trying to load all the data to RAM
I have tried it out to merge the file to one, Spark is now working with RAM as I've expected. Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster? I am running job from the following command: ./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip /home/hadoop/preprocessor.py I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node. Thank you in advance for any advice, Best regards, Jan __ This is a crazy cases that has a few millions of files, the scheduler will run out of memory. Be default, each file will become a partition, so you will have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce the number of tasks, but can not reduce the number of partitions of original RDD. Could you pack the small files int bigger ones? Spark works much better than small files. On Mon, Nov 3, 2014 at 11:46 AM, jan.zi...@centrum.cz wrote: I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to
Re: Spark on Yarn probably trying to load all the data to RAM
Ok so the problem was solved, it that the file was gziped and it looks that Spark does not support direct .gz file distribution to workers. Thank you very much fro the suggestion to merge the files. Best regards, Jan __ I have tried it out to merge the file to one, Spark is now working with RAM as I've expected. Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster? I am running job from the following command: ./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip /home/hadoop/preprocessor.py I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node. Thank you in advance for any advice, Best regards, Jan __ This is a crazy cases that has a few millions of files, the scheduler will run out of memory. Be default, each file will become a partition, so you will have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce the number of tasks, but can not reduce the number of partitions of original RDD. Could you pack the small files int bigger ones? Spark works much better than small files. On Mon, Nov 3, 2014 at 11:46 AM, jan.zi...@centrum.cz wrote: I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to
Re: Spark on Yarn probably trying to load all the data to RAM
Could you please give me an example or send me a link of how to use Hadoop CombinedFileInputFormat? It sound very interesting to me and it would probably save me several hours of my pipeline computation. Merging of the files is currently the bottleneck in my system. __ Another potential option could be to use Hadoop CombinedFileInputFormat with input split size of say 512 MB or 1 GB. That way you don't need to have a preceding step and I/O of first combining the files together. On Nov 5, 2014 8:23 AM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: Ok so the problem was solved, it that the file was gziped and it looks that Spark does not support direct .gz file distribution to workers. Thank you very much fro the suggestion to merge the files. Best regards, Jan __ I have tried it out to merge the file to one, Spark is now working with RAM as I've expected. Unfortunately after doing this there appears another problem. Now Spark running on YARN is scheduling all the work only to one worker node as a one big job. Is there some way, how to force Spark and Yarn to schedule all the work uniformly across the whole cluster? I am running job from the following command: ./spark/bin/spark-submit --master yarn-client --py-files /home/hadoop/my_pavkage.zip /home/hadoop/preprocessor.py I have also tried to play with options --num-executors and --executor-cores. But unfortunately I am not able to force Spark to run jobs on more than just one cluster node. Thank you in advance for any advice, Best regards, Jan __ This is a crazy cases that has a few millions of files, the scheduler will run out of memory. Be default, each file will become a partition, so you will have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce the number of tasks, but can not reduce the number of partitions of original RDD. Could you pack the small files int bigger ones? Spark works much better than small files. On Mon, Nov 3, 2014 at 11:46 AM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver]
Re: Spark on Yarn probably trying to load all the data to RAM
I have 3 datasets in all the datasets the average file size is 10-12Kb. I am able to run my code on the dataset with 70K files, but I am not able to run it on datasets with 1.1M and 3.8M files. __ On Sun, Nov 2, 2014 at 1:35 AM, jan.zi...@centrum.cz wrote: Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) How many files do you have? and the average size of each file? myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@5ca1c790 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@5ca1c790 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not found 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application already ended: FINISHED 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 14/11/01 22:07:10
Spark on Yarn probably trying to load all the data to RAM
Hi, I am using Spark on Yarn, particularly Spark in Python. I am trying to run: myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json) myrdd.getNumPartitions() Unfortunately it seems that Spark tries to load everything to RAM, or at least after while of running this everything slows down and then I am getting errors with log below. Everything works fine for datasets smaller than RAM, but I would expect Spark doing this without storing everything to RAM. So I would like to ask if I'm not missing some settings in Spark on Yarn? Thank you in advance for any help. 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs] 11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs] 11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs] 11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs] 11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs] Traceback (most recent call last): File stdin, line 1, in module File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions return self._jrdd.partitions().size() File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2 : An error occurred while calling o112.partitions. : java.lang.OutOfMemoryError: GC overhead limit exceeded 11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs] 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem [sparkDriver] java.lang.OutOfMemoryError: GC overhead limit exceeded 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 14/11/01 22:07:09 INFO Remoting: Remoting shut down 14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871) 14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ? sun.nio.ch.SelectionKeyImpl@5ca1c790 14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ? sun.nio.ch.SelectionKeyImpl@5ca1c790 java.nio.channels.CancelledKeyException at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386) at org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) 14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding SendingConnection to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not found 14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application already ended: FINISHED 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/metrics/json,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/stages/stage/kill,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/static,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors/json,null} 14/11/01 22:07:10 INFO handler.ContextHandler: stopped o.e.j.s.ServletContextHandler{/executors,null} 14/11/01
Re: Spark speed performance
Thank you, I would expect it to work as you write, but I am probably experiencing it working other way. But now it seems that Spark is generally trying to fit everything to RAM. I run Spark on YARN and I have wraped this to another question: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-probably-trying-to-load-all-the-data-to-RAM-td17908.html __ coalesce() is a streaming operation if used without the second parameter, it does not put all the data in RAM. If used with the second parameter (shuffle = true), then it performs a shuffle, but still does not put all the data in RAM. On Sat, Nov 1, 2014 at 12:09 PM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: Now I am getting to problems using: distData = sc.textFile(sys.argv[2]).coalesce(10) The problem is that it seems that Spark is trying to put all the data to RAM first and then perform coalesce. Do you know if there is something that would do coalesce on fly with for example fixed size of the partition? Do you think that something like this is possible? Unfortunately I am not able to find anything like this in the Spark documentation. Thank you in advance for any advices or suggestions. Best regards, Jan __ Thank you very much lot of very small json files was exactly the speed performance problem, using coalesce makes my Spark program to run on single node only twice slower (even with starting Spark) than single node Python program, which is acceptable. Jan __ Because the overhead between JVM and Python, single task will be slower than your local Python scripts, but it's very easy to scale to many CPUs. Even one CPUs, it's not common that PySpark was 100 times slower. You have many small files, each file will be processed by a task, which will have about 100ms overhead (scheduled and executed), but the small file can be processed in your single thread Python script in less than 1ms. You could pack your json files into larger ones, or you could try to merge the small tasks into larger one by coalesce(N), such as: distData = sc.textFile(sys.argv[2]).coalesce(10) # which will have 10 partitons (tasks) Davies On Sat, Oct 18, 2014 at 12:07 PM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote: Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark speed performance
Now I am getting to problems using: distData = sc.textFile(sys.argv[2]).coalesce(10) The problem is that it seems that Spark is trying to put all the data to RAM first and then perform coalesce. Do you know if there is something that would do coalesce on fly with for example fixed size of the partition? Do you think that something like this is possible? Unfortunately I am not able to find anything like this in the Spark documentation. Thank you in advance for any advices or suggestions. Best regards, Jan __ Thank you very much lot of very small json files was exactly the speed performance problem, using coalesce makes my Spark program to run on single node only twice slower (even with starting Spark) than single node Python program, which is acceptable. Jan __ Because the overhead between JVM and Python, single task will be slower than your local Python scripts, but it's very easy to scale to many CPUs. Even one CPUs, it's not common that PySpark was 100 times slower. You have many small files, each file will be processed by a task, which will have about 100ms overhead (scheduled and executed), but the small file can be processed in your single thread Python script in less than 1ms. You could pack your json files into larger ones, or you could try to merge the small tasks into larger one by coalesce(N), such as: distData = sc.textFile(sys.argv[2]).coalesce(10) # which will have 10 partitons (tasks) Davies On Sat, Oct 18, 2014 at 12:07 PM, jan.zi...@centrum.cz wrote: Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Repartitioning by partition size, not by number of partitions.
Hi, I have inpot data that are many of very small files containing one .json. For performance reasons (I use PySpark) I have to do repartioning, currently I do: sc.textFile(files).coalesce(100)) Problem is that I have to guess the number of partitions in a such way that it's as fast as possible and I am still on the sefe side with the RAM memory. So this is quiet difficult. For this reason I would like to ask if there is some way, how to replace coalesce(100) by something that creates N partitions of the given size? I went through the documentation, but I was not able to find some way, how to do that. thank you in advance for any help or advice. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Repartitioning by partition size, not by number of partitions.
Hi Ilya, This seems to me as quiet complicated solution, I'm thinking that easier (though not optimal) solution might be for example to use heuristicaly something like RDD.coalesce(RDD.getNumPartitions() / N), but it keeps me wonder that Spark does not have something like RDD.coalesce(partition_size). __ Hi Jan. I've actually written a function recently to do precisely that using the RDD.randomSplit function. You just need to calculate how big each element of your data is, then how many of each data can fit in each RDD to populate the input to rqndomSplit. Unfortunately, in my case I wind up with GC errors on large data doing this and am still debugging :) -Original Message- From: jan.zi...@centrum.cz [jan.zi...@centrum.cz jan.zi...@centrum.cz] Sent: Friday, October 31, 2014 06:27 AM Eastern Standard Time To: user@spark.apache.org Subject: Repartitioning by partition size, not by number of partitions. Hi, I have inpot data that are many of very small files containing one .json. For performance reasons (I use PySpark) I have to do repartioning, currently I do: sc.textFile(files).coalesce(100)) Problem is that I have to guess the number of partitions in a such way that it's as fast as possible and I am still on the sefe side with the RAM memory. So this is quiet difficult. For this reason I would like to ask if there is some way, how to replace coalesce(100) by something that creates N partitions of the given size? I went through the documentation, but I was not able to find some way, how to do that. thank you in advance for any help or advice. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set Spark to perform only one map at once at each cluster node
Yes I would expect it as you say, setting executor-cores as 1 would work, but it seems to me that when I do use executor-cores=1 than it does actually perform more than one job on each of the machines at one time moment (at least based on what top says). __ CC: user@spark.apache.org It's not very difficult to implement by properly set parameter of application.Some basic knowledge you should know: An application can have only one executor at each machine or container (YARN).So you just set executor-cores as 1, then each executor will make only one task at once. 2014-10-28 19:00 GMT+08:00 jan.zi...@centrum.cz jan.zi...@centrum.cz: But I guess that this makes only one task over all the clusters nodes. I would like to run several tasks, but I would like Spark to not run more than one map at each of my nodes at one time. That means I would like to let's say have 4 different tasks and 2 nodes where each node has 2 cores. Currently hadoop runs 2 maps in parallel at each node (all the 4 tasks in parallel), but I would like to somehow force it to run only 1 task at each node and to give it another task after the first task will finish. __ The number of tasks is decided by the input partition numbers.If you want only one map or flatMap at once, just call coalesce() or repartition() to associate data into one partition.However, this is not recommend because it was not executed parallel efficiently. 2014-10-28 17:27 GMT+08:00 jan.zi...@centrum.cz jan.zi...@centrum.cz: Hi, I am currently struggling with how to properly set Spark to perform only one map, flatMap, etc at once. In other words my map uses multi core algorithm so I would like to have only one map running to be able to use all the machine cores. Thank you in advance for advices and replies. Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to set Spark to perform only one map at once at each cluster node
Hi, I am currently struggling with how to properly set Spark to perform only one map, flatMap, etc at once. In other words my map uses multi core algorithm so I would like to have only one map running to be able to use all the machine cores. Thank you in advance for advices and replies. Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set Spark to perform only one map at once at each cluster node
But I guess that this makes only one task over all the clusters nodes. I would like to run several tasks, but I would like Spark to not run more than one map at each of my nodes at one time. That means I would like to let's say have 4 different tasks and 2 nodes where each node has 2 cores. Currently hadoop runs 2 maps in parallel at each node (all the 4 tasks in parallel), but I would like to somehow force it to run only 1 task at each node and to give it another task after the first task will finish. __ The number of tasks is decided by the input partition numbers.If you want only one map or flatMap at once, just call coalesce() or repartition() to associate data into one partition.However, this is not recommend because it was not executed parallel efficiently. 2014-10-28 17:27 GMT+08:00 jan.zi...@centrum.cz jan.zi...@centrum.cz: Hi, I am currently struggling with how to properly set Spark to perform only one map, flatMap, etc at once. In other words my map uses multi core algorithm so I would like to have only one map running to be able to use all the machine cores. Thank you in advance for advices and replies. Jan - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark problem with textblob from NLTK used in map
So the problem was that Spark has internaly set home to /home. Hack to make this work with Python is to add before call of textblob line: os.environ['HOME'] = '/home/hadoop' __ Maybe I'll add one more question. I think that the problem is with user, so I would like to ask under which user are run Spark jobs on slaves? __ Hi, I am trying to implement function for text preprocessing in PySpark. I have amazon EMR where I am installing Python dependencies from the bootstrap script. One of these dependencies is textblob python -m textblob.download_corpora. Then I am trying to use it locally on all the machines without any problem. But when I am trying to run it from Spark then I am getting following error: INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1324, in saveAsTextFile INFO: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) INFO: File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ INFO: File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value INFO: py4j.protocol.Py4JJavaError: An error occurred while calling o54.saveAsTextFile. INFO: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 1.0 failed 4 times, most recent failure: Lost task 8.3 in stage 1.0 (TID 40, ip-172-31-3-125.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): INFO: File /home/hadoop/spark/python/pyspark/worker.py, line 79, in main INFO: serializer.dump_stream(func(split_index, iterator), outfile) INFO: File /home/hadoop/spark/python/pyspark/serializers.py, line 127, in dump_stream INFO: for obj in iterator: INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1316, in func INFO: for x in iterator: INFO: File /home/hadoop/pyckage/package_topics/package_topics/preprocessor.py, line 40, in make_tokens INFO: File ./package_topics.zip/package_topics/data_utils.py, line 76, in preprocess_text INFO: for noun_phrase in TextBlob(' '.join(tokens)).noun_phrases INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 24, in __get__ INFO: value = obj.__dict__[self.func.__name__] = self.func(obj) INFO: File /usr/lib/python2.6/site-packages/textblob/blob.py, line 431, in noun_phrases INFO: for phrase in self.np_extractor.extract(self.raw) INFO: File /usr/lib/python2.6/site-packages/textblob/en/np_extractors.py, line 138, in extract INFO: self.train() INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 38, in decorated INFO: raise MissingCorpusError() INFO: MissingCorpusError: INFO: Looks like you are missing some required data for this feature. INFO: INFO: To download the necessary data, simply run INFO: INFO: python -m textblob.download_corpora INFO: INFO: or use the NLTK downloader to download the missing data: http://nltk.org/data.html INFO: If this doesn't fix the problem, file an issue at https://github.com/sloria/TextBlob/issues. INFO: INFO: INFO: org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) INFO: org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:154) INFO: org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229) INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229) INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229) INFO: org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) INFO: org.apache.spark.scheduler.Task.run(Task.scala:54) INFO: org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) INFO: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) INFO: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) INFO: java.lang.Thread.run(Thread.java:745) INFO: Driver stacktrace: INFO: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) INFO: at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) INFO: at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) INFO: at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) INFO: at
Re: PySpark problem with textblob from NLTK used in map
Maybe I'll add one more question. I think that the problem is with user, so I would like to ask under which user are run Spark jobs on slaves? __ Hi, I am trying to implement function for text preprocessing in PySpark. I have amazon EMR where I am installing Python dependencies from the bootstrap script. One of these dependencies is textblob python -m textblob.download_corpora. Then I am trying to use it locally on all the machines without any problem. But when I am trying to run it from Spark then I am getting following error: INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1324, in saveAsTextFile INFO: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) INFO: File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ INFO: File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value INFO: py4j.protocol.Py4JJavaError: An error occurred while calling o54.saveAsTextFile. INFO: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 1.0 failed 4 times, most recent failure: Lost task 8.3 in stage 1.0 (TID 40, ip-172-31-3-125.ec2.internal): org.apache.spark.api.python.PythonException: Traceback (most recent call last): INFO: File /home/hadoop/spark/python/pyspark/worker.py, line 79, in main INFO: serializer.dump_stream(func(split_index, iterator), outfile) INFO: File /home/hadoop/spark/python/pyspark/serializers.py, line 127, in dump_stream INFO: for obj in iterator: INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1316, in func INFO: for x in iterator: INFO: File /home/hadoop/pyckage/package_topics/package_topics/preprocessor.py, line 40, in make_tokens INFO: File ./package_topics.zip/package_topics/data_utils.py, line 76, in preprocess_text INFO: for noun_phrase in TextBlob(' '.join(tokens)).noun_phrases INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 24, in __get__ INFO: value = obj.__dict__[self.func.__name__] = self.func(obj) INFO: File /usr/lib/python2.6/site-packages/textblob/blob.py, line 431, in noun_phrases INFO: for phrase in self.np_extractor.extract(self.raw) INFO: File /usr/lib/python2.6/site-packages/textblob/en/np_extractors.py, line 138, in extract INFO: self.train() INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 38, in decorated INFO: raise MissingCorpusError() INFO: MissingCorpusError: INFO: Looks like you are missing some required data for this feature. INFO: INFO: To download the necessary data, simply run INFO: INFO: python -m textblob.download_corpora INFO: INFO: or use the NLTK downloader to download the missing data: http://nltk.org/data.html INFO: If this doesn't fix the problem, file an issue at https://github.com/sloria/TextBlob/issues. INFO: INFO: INFO: org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124) INFO: org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:154) INFO: org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87) INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229) INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229) INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229) INFO: org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) INFO: org.apache.spark.scheduler.Task.run(Task.scala:54) INFO: org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) INFO: java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) INFO: java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) INFO: java.lang.Thread.run(Thread.java:745) INFO: Driver stacktrace: INFO: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) INFO: at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) INFO: at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) INFO: at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) INFO: at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) INFO: at scala.Option.foreach(Option.scala:236) INFO: at
Under which user is the program run on slaves?
Hi, I would like to ask under which user is run the Spark program on slaves? My Spark is running on top of the Yarn. The reason I am asking for this is that I need to download data for NLTK library and these data are dowloaded for specific python user and I am currently struggling with this. http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-problem-with-textblob-from-NLTK-used-in-map-td17211.html Than you in advance for any ideas. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark speed performance
Thank you very much lot of very small json files was exactly the speed performance problem, using coalesce makes my Spark program to run on single node only twice slower (even with starting Spark) than single node Python program, which is acceptable. Jan __ Because the overhead between JVM and Python, single task will be slower than your local Python scripts, but it's very easy to scale to many CPUs. Even one CPUs, it's not common that PySpark was 100 times slower. You have many small files, each file will be processed by a task, which will have about 100ms overhead (scheduled and executed), but the small file can be processed in your single thread Python script in less than 1ms. You could pack your json files into larger ones, or you could try to merge the small tasks into larger one by coalesce(N), such as: distData = sc.textFile(sys.argv[2]).coalesce(10) # which will have 10 partitons (tasks) Davies On Sat, Oct 18, 2014 at 12:07 PM, jan.zi...@centrum.cz wrote: Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark speed performance
Hi, I have program that I have for single computer (in Python) exection and also implemented the same for Spark. This program basically only reads .json from which it takes one field and saves it back. Using Spark my program runs aproximately 100 times slower on 1 master and 1 slave. So I would like to ask where possibly might be the problem? My Spark program looks like: sc = SparkContext(appName=Json data preprocessor) distData = sc.textFile(sys.argv[2]) json_extractor = JsonExtractor(sys.argv[1]) cleanedData = distData.flatMap(json_extractor.extract_json) cleanedData.saveAsTextFile(sys.argv[3]) JsonExtractor only selects the data from field that is given by sys.argv[1]. My data are basically many small one json files, where is one json per line. I have tried both, reading and writing the data from/to Amazon S3, local disc on all the machines. I would like to ask if there is something that I am missing or if Spark is supposed to be so slow in comparison with the local non parallelized single node program. Thank you in advance for any suggestions or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How does reading the data from Amazon S3 works?
Hi, I have seen in the video from Spark summit that usually (when I use HDFS) are data distributed across the whole cluster and usually computations goes to the data. My question is how does it work when I read the data from Amazon S3? Is the whole input dataset readed by the master node and then distributed to the slave nodes? Or does master node only determine which slave should read what and then the reading is performed independently by each of the slaves? Thank you in advance for the clarification. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to assure that there will be run only one map per cluster node?
hi, I have cluster that has several nodes and every node has several cores. I'd like to run multi-core algorithm within every map. So I'd like to assure that there will be performed only one map per cluster node. Is there some way, how to assure this? It seems to me that it should be possible by spark.task.cpus as it is described at https://spark.apache.org/docs/latest/configuration.html, but it's not clear to me if the value is total number of CPUs per cluster or CPUs per cluster node? Thank you in advance for any help and suggestions. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:
I've tried to add / at the end of the path, but the result was exactly the same. I also guess that there will be some problem on the level of Hadoop - S3 comunication. Doy you know if there is some possibility of how tu run scripts from Spark on for example different hadoom version from the standard EC2 installation? __ Od: Sean Owen so...@cloudera.com Komu: jan.zi...@centrum.cz Datum: 08.10.2014 18:05 Předmět: Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist: CC: user@spark.apache.org Take this as a bit of a guess, since I don't use S3 much and am only a bit aware of the Hadoop+S3 integration issues. But I know that S3's lack of proper directories causes a few issues when used with Hadoop, which wants to list directories. According to http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/s3native/NativeS3FileSystem.html http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/s3native/NativeS3FileSystem.html ... I wonder if you simply need to end the path with / to make it clear you mean it as a directory. Hadoop S3 OutputFormats are going to append ..._$folder$ files to mark directories too, although I don't think it's required necessarily to read them as dirs. I still imagine there could be some problem between Hadoop in Spark in this regard, but worth trying the path thing first. You do need s3n:// for sure. On Wed, Oct 8, 2014 at 4:54 PM, jan.zi...@centrum.cz wrote: One more update: I've realized that this problem is not only Python related. I've tried it also in Scala, but I'm still getting the same error, my scala code: val file = sc.wholeTextFiles(s3n://wiki-dump/wikiinput).first() __ My additional question is if this problem can be possibly caused by the fact that my file is bigger than RAM memory across the whole cluster? __ Hi I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result =
Re: Parsing one big multiple line .xml loaded in RDD using Python
Thank you, this seems to be the way to go, but unfortunately, when I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) for item in result: print item.getvalue() sc.stop() So my question is is it possible to read whole files from S3? Based on the documentation it shouold be possible, but it seems that it does not work for me. __ Od: Davies Liu dav...@databricks.com Komu: jan.zi...@centrum.cz Datum: 07.10.2014 17:38 Předmět: Re: Parsing one big multiple line .xml loaded in RDD using Python CC: u...@spark.incubator.apache.org Maybe sc.wholeTextFile() is what you want, you can get the whole text and parse it by yourself. On Tue, Oct 7, 2014 at 1:06 AM, jan.zi...@centrum.cz wrote: Hi, I have already unsucesfully asked quiet simmilar question at stackoverflow, particularly here: http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim. I've also unsucessfully tryied some workaround, but unsucessfuly, workaround problem can be found at http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html. Particularly what I'm trying to do, I have .xml dump of wikipedia as the input. The .xml is quite big and it spreads across multiple lines. You can check it out at http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2 http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2. My goal is to parse this .xml in a same way as gensim.corpora.wikicorpus.extract_pages do, implementation is at https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py
Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:
My additional question is if this problem can be possibly caused by the fact that my file is bigger than RAM memory across the whole cluster? __ Hi I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) for item in result: print item.getvalue() sc.stop() So my question is, is it possible to read whole files from S3? Based on the documentation it shouold be possible, but it seems that it does not work for me. When I do just: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10) print distData Then the error that I'm getting is exactly the same. Thank you in advance for any advice. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:
One more update: I've realized that this problem is not only Python related. I've tried it also in Scala, but I'm still getting the same error, my scala code: val file = sc.wholeTextFiles(s3n://wiki-dump/wikiinput).first() __ My additional question is if this problem can be possibly caused by the fact that my file is bigger than RAM memory across the whole cluster? __ Hi I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting following Error: 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1 Traceback (most recent call last): File /root/distributed_rdd_test.py, line 27, in module result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) File /root/spark/python/pyspark/rdd.py, line 1126, in take totalParts = self._jrdd.partitions().size() File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions. : java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240) at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50) at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) My code is following: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10) for item in result: print item.getvalue() sc.stop() So my question is, is it possible to read whole files from S3? Based on the documentation it shouold be possible, but it seems that it does not work for me. When I do just: sc = SparkContext(appName=Process wiki) distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10) print distData Then the error that I'm getting is exactly the same. Thank you in advance for any advice. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Parsing one big multiple line .xml loaded in RDD using Python
Hi, I have already unsucesfully asked quiet simmilar question at stackoverflow, particularly here: http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim. I've also unsucessfully tryied some workaround, but unsucessfuly, workaround problem can be found at http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html. Particularly what I'm trying to do, I have .xml dump of wikipedia as the input. The .xml is quite big and it spreads across multiple lines. You can check it out at http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2. My goal is to parse this .xml in a same way as gensim.corpora.wikicorpus.extract_pages do, implementation is at https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py. Unfortunately this method does not work, because RDD.flatMap() process the RDD line by line as strings. Does anyone has some suggestion of how to possibly parse the wikipedia like .xml loaded in RDD using Python? Thank you in advance for any suggestions, advices or hints. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py 14/10/06 13:34:18 INFO spark.SparkContext: Added file file:/root/generator_test.py at http://172.31.25.197:44768/files/generator_test.py with timestamp 1412602458065 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077... 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20141006133418-0046 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added: app-20141006133418-0046/0 on worker-20141005074620-ip-172-31-30-40.ec2.internal-49979 (ip-172-31-30-40.ec2.internal:49979) with 1 cores 14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20141006133418-0046/0 on hostPort ip-172-31-30-40.ec2.internal:49979 with 1 cores, 512.0 MB RAM 14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated: app-20141006133418-0046/0 is now RUNNING 14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852] with ID 0 14/10/06 13:34:21 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-30-40.ec2.internal:34460 with 267.3 MB RAM Thank you in advance for any advice or sugestion. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
Hi, Thank you for your advice. It really might work, but to specify my problem a bit more, think of my data more like one generated item is one parsed wikipedia page. I am getting this generator from the parser and I don't want to save it to the storage, but directly apply parallelize and create RDD, based on your advice I'm now thinking that something like batching and creating several RDDs and then applying union on them might possibly be the way to go. Originaly I was thinking of calling the parsing function in flatMap on the RDD loaded from the xml file, but then I unfortunately had this problem http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim so now I am trying to parse the xml on the master node an directly put it to the RDD. __ Od: Davies Liu dav...@databricks.com Komu: jan.zi...@centrum.cz Datum: 06.10.2014 18:09 Předmět: Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize() sc.parallelize() to distribute a list of data into numbers of partitions, but generator can not be cut and serialized automatically. If you can partition your generator, then you can try this: sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x)) such as you want to generate xrange(M), M is huge, so sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1)) On Mon, Oct 6, 2014 at 7:16 AM, jan.zi...@centrum.cz wrote: Hi, I would like to ask if it is possible to use generator, that generates data bigger than size of RAM across all the machines as the input for sc = SparkContext(), sc.paralelize(generator). I would like to create RDD this way. When I am trying to create RDD by sc.TextFile(file) where file has even bigger size than data generated by the generator everything works fine, but unfortunately I need to use sc.parallelize(generator) and it makes my OS to kill the spark job. I'm getting only this log and then the job is killed: 14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root, 14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root, ); users with modify permissions: Set(root, ) 14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/10/06 13:34:16 INFO Remoting: Starting remoting 14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016] 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' on port 41016. 14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker 14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster 14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-local-20141006133417-821e 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection manager for block manager' on port 42438. 14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438) 14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 267.3 MB 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM 14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager 14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523 14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44768 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file server' on port 44768. 14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/10/06 13:34:17 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to /tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py 14/10/06 13:34:18 INFO spark.SparkContext: Added file file:/root/generator_test.py at http://172.31.25.197:44768/files/generator_test.py http://172.31.25.197:44768/files/generator_test.py with timestamp 1412602458065 14/10/06 13:34:18
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
@Davies I know that gensim.corpora.wikicorpus.extract_pages will be for sure the bottle neck on the master node. Unfortunately I am using Spark on EC2 and I don't have enough space on my nodes to store there whole data that needs to be parsed by extract_pages. I have my data on S3 and I kind of hoped that after reading (sc.textFile(file_on_s3)) the data from S3 to RDD it will be possible to pass the RDD to extract_pages, this unfortunately does not work for me. If it'd work it'd be by far the best way to go for me. @Steve I can try Hadoop Custom InputFormat. It'd be great if you could send me some samples. But if I understand it correctly then I'm afraid that it won't work for me, because I actually don't have any url to wikipedia, I have only file, that is opened, parsed and returned as generator that generates parsed pagename and text from wikipedia (it can be also some non public wikipedia like site) __ Od: Steve Lewis lordjoe2...@gmail.com Komu: Davies Liu dav...@databricks.com Datum: 06.10.2014 22:39 Předmět: Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize() CC: user Try a Hadoop Custom InputFormat - I can give you some samples - While I have not tried this an input split has only a length (could be ignores if the format treats as non splittable) and a String for a location.If the location is a URL into wikipedia the whole thing should work.Hadoop InputFormats seem to be the best way to get large (say multi gigabyte files) into RDDs - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org