spark spark-ec2 credentials using aws_security_token

2015-07-27 Thread jan.zikes

Hi,
 
I would like to ask if it is currently possible to use spark-ec2 script 
together with credentials that are consisting not only from: aws_access_key_id 
and aws_secret_access_key, but it also contains aws_security_token.
 
When I try to run the script I am getting following error message:
 
ERROR:boto:Caught exception reading instance data
Traceback (most recent call last):
  File /Users/zikes/opensource/spark/ec2/lib/boto-2.34.0/boto/utils.py, line 
210, in retry_url
    r = opener.open(req, timeout=timeout)
  File 
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py,
 line 404, in open
    response = self._open(req, data)
  File 
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py,
 line 422, in _open
    '_open', req)
  File 
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py,
 line 382, in _call_chain
    result = func(*args)
  File 
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py,
 line 1214, in http_open
    return self.do_open(httplib.HTTPConnection, req)
  File 
/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/urllib2.py,
 line 1184, in do_open
    raise URLError(err)
URLError: urlopen error [Errno 64] Host is down
ERROR:boto:Unable to read instance data, giving up
No handler was ready to authenticate. 1 handlers were checked. 
['QuerySignatureV2AuthHandler'] Check your credentials
 
Does anyone has some idea what can be possibly wrong? Is aws_security_token the 
problem?
I know that it seems more like a boto problem, but still I would like to ask if 
anybody has some experience with this?
 
My launch command is:
./spark-ec2 -k my_key -i my_key.pem --additional-tags mytag:tag1,mytag2:tag2 
--instance-profile-name profile1 -s 1 launch test
 
Thank you in advance for any help.
Best regards,
 
Jan
 
Note:
I have also asked at 
http://stackoverflow.com/questions/31583513/spark-spark-ec2-credentials-using-aws-security-token?noredirect=1#comment51151822_31583513
 without any success.

 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Where can I find logs from workers PySpark

2014-11-11 Thread jan.zikes
Hi, 

I am trying to do some logging in my PySpark jobs, particularly in map that is 
performed on workers. Unfortunately I am not able tofind these logs. Based on 
the documentation it seems that the logs should be on masters in the 
SPARK_KOME, directory work 
http://spark.apache.org/docs/latest/spark-standalone.html#monitoring-and-logging
 . But unfortunately I am can't see this directory.

So I would like to ask, how do you do logging on workers? Is it somehow 
possible to send this logs also to master, for example after the job fininshes?

Than you in advance for any suggestions and advices.

Best regards,
Jan 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on YARN, ExecutorLostFailure for long running computations in map

2014-11-08 Thread jan.zikes
So it seems that this problem was related to 
http://apache-spark-developers-list.1001551.n3.nabble.com/Lost-executor-on-YARN-ALS-iterations-td7916.html
 and increasing the executor memory worked for me.
__


Hi,

I am getting ExecutorLostFailure when I run spark on YARN and in map I perform 
very long tasks (couple of hours). Error Log is below.

Do you know if it is possible to set something to make it possible for Spark to 
perform these very long running jobs in map?

Thank you very much for any advice.

Best regards,
Jan 
 
Spark log:
4533,931: [GC 394578K-20882K(1472000K), 0,0226470 secs]
Traceback (most recent call last):
  File /home/hadoop/spark_stuff/spark_lda.py, line 112, in module
    models.saveAsTextFile(sys.argv[1])
  File /home/hadoop/spark/python/pyspark/rdd.py, line 1324, in saveAsTextFile
    keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
  File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__
  File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, 
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o36.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in 
stage 0.0 failed 4 times, most recent failure: Lost task 28.3 in stage 0.0 (TID 
41, ip-172-16-1-90.us-west-2.compute.internal): ExecutorLostFailure (executor 
lost)
Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at scala.Option.foreach(Option.scala:236)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
 
 
Yarn log:
14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-172-16-1-152.us-west-2.compute.internal:41091 (size: 596.9 KB, 
free: 775.7 MB)
14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-172-16-1-152.us-west-2.compute.internal:39160 (size: 596.9 KB, 
free: 775.7 MB)
14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-172-16-1-152.us-west-2.compute.internal:45058 (size: 596.9 KB, 
free: 775.7 MB)
14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-172-16-1-241.us-west-2.compute.internal:54111 (size: 596.9 KB, 
free: 775.7 MB)
14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-172-16-1-238.us-west-2.compute.internal:45772 (size: 596.9 KB, 
free: 775.7 MB)
14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-172-16-1-241.us-west-2.compute.internal:59509 (size: 596.9 KB, 
free: 775.7 MB)
14/11/08 08:20:34 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in 
memory on ip-172-16-1-238.us-west-2.compute.internal:35720 (size: 596.9 KB, 
free: 775.7 MB)
14/11/08 08:21:11 INFO network.ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(ip-172-16-1-241.us-west-2.compute.internal,59509)
14/11/08 08:21:11 INFO network.ConnectionManager: Removing ReceivingConnection 
to ConnectionManagerId(ip-172-16-1-241.us-west-2.compute.internal,59509)
14/11/08 08:21:11 ERROR network.ConnectionManager: Corresponding 
SendingConnection to 

Re: Spark on Yarn probably trying to load all the data to RAM

2014-11-05 Thread jan.zikes

I have tried it out to merge the file to one, Spark is now working with RAM as 
I've expected.

Unfortunately after doing this there appears another problem. Now Spark running 
on YARN is scheduling all the work only to one worker node as a one big job. Is 
there some way, how to force Spark and Yarn to schedule all the work uniformly 
across the whole cluster?
 
I am running job from the following command:
./spark/bin/spark-submit --master yarn-client --py-files 
/home/hadoop/my_pavkage.zip  /home/hadoop/preprocessor.py 
 
I have also tried to play with options --num-executors and --executor-cores. 
But unfortunately I am not able to force Spark to run jobs on more than just 
one cluster node.
 
Thank you in advance for any advice,
Best regards,
Jan 
__
This is a crazy cases that has a few millions of files, the scheduler will run
out of memory. Be default, each file will become a partition, so you will
have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce
the number of tasks, but can not reduce the number of partitions of original
RDD.

Could you pack the small files int bigger ones? Spark works much better than
small files.

On Mon, Nov 3, 2014 at 11:46 AM,  jan.zi...@centrum.cz wrote:

I have 3 datasets in all the datasets the average file size is 10-12Kb.
I am able to run my code on the dataset with 70K files, but I am not able to
run it on datasets with 1.1M and 3.8M files.

__

On Sun, Nov 2, 2014 at 1:35 AM,  jan.zi...@centrum.cz wrote:

Hi,

I am using Spark on Yarn, particularly Spark in Python. I am trying to
run:

myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json)


How many files do you have? and the average size of each file?


myrdd.getNumPartitions()

Unfortunately it seems that Spark tries to load everything to RAM, or at
least after while of running this everything slows down and then I am
getting errors with log below. Everything works fine for datasets smaller
than RAM, but I would expect Spark doing this without storing everything
to
RAM. So I would like to ask if I'm not missing some settings in Spark on
Yarn?


Thank you in advance for any help.


14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs]

11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs]

11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs]

11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs]

11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs]

Traceback (most recent call last):

  File stdin, line 1, in module

  File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in
getNumPartitions

    return self._jrdd.partitions().size()

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line
300, in get_return_value

py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2

: An error occurred while calling o112.partitions.

: java.lang.OutOfMemoryError: GC overhead limit exceeded




11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs]


14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.

14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.

14/11/01 22:07:09 INFO Remoting: Remoting shut down

14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Remoting shut down.

14/11/01 22:07:09 INFO network.ConnectionManager: Removing
ReceivingConnection to
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

14/11/01 22:07:09 INFO network.ConnectionManager: Removing
SendingConnection
to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

14/11/01 22:07:09 INFO network.ConnectionManager: Removing
SendingConnection
to 

Re: Spark on Yarn probably trying to load all the data to RAM

2014-11-05 Thread jan.zikes

Ok so the problem was solved, it that the file was gziped and it looks that 
Spark does not support direct .gz file distribution to workers. 

Thank you very much fro the suggestion to merge the files.

Best regards,
Jan 
__


I have tried it out to merge the file to one, Spark is now working with RAM as 
I've expected.

Unfortunately after doing this there appears another problem. Now Spark running 
on YARN is scheduling all the work only to one worker node as a one big job. Is 
there some way, how to force Spark and Yarn to schedule all the work uniformly 
across the whole cluster?
 
I am running job from the following command:
./spark/bin/spark-submit --master yarn-client --py-files 
/home/hadoop/my_pavkage.zip  /home/hadoop/preprocessor.py 
 
I have also tried to play with options --num-executors and --executor-cores. 
But unfortunately I am not able to force Spark to run jobs on more than just 
one cluster node.
 
Thank you in advance for any advice,
Best regards,
Jan 
__
This is a crazy cases that has a few millions of files, the scheduler will run
out of memory. Be default, each file will become a partition, so you will
have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce
the number of tasks, but can not reduce the number of partitions of original
RDD.

Could you pack the small files int bigger ones? Spark works much better than
small files.

On Mon, Nov 3, 2014 at 11:46 AM,  jan.zi...@centrum.cz wrote:

I have 3 datasets in all the datasets the average file size is 10-12Kb.
I am able to run my code on the dataset with 70K files, but I am not able to
run it on datasets with 1.1M and 3.8M files.

__

On Sun, Nov 2, 2014 at 1:35 AM,  jan.zi...@centrum.cz wrote:

Hi,

I am using Spark on Yarn, particularly Spark in Python. I am trying to
run:

myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json)


How many files do you have? and the average size of each file?


myrdd.getNumPartitions()

Unfortunately it seems that Spark tries to load everything to RAM, or at
least after while of running this everything slows down and then I am
getting errors with log below. Everything works fine for datasets smaller
than RAM, but I would expect Spark doing this without storing everything
to
RAM. So I would like to ask if I'm not missing some settings in Spark on
Yarn?


Thank you in advance for any help.


14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs]

11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs]

11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs]

11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs]

11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs]

Traceback (most recent call last):

  File stdin, line 1, in module

  File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in
getNumPartitions

    return self._jrdd.partitions().size()

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line
300, in get_return_value

py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2

: An error occurred while calling o112.partitions.

: java.lang.OutOfMemoryError: GC overhead limit exceeded




11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs]


14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.

14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.

14/11/01 22:07:09 INFO Remoting: Remoting shut down

14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Remoting shut down.

14/11/01 22:07:09 INFO network.ConnectionManager: Removing
ReceivingConnection to

Re: Spark on Yarn probably trying to load all the data to RAM

2014-11-05 Thread jan.zikes

Could you please give me an example or send me a link of how to use Hadoop 
CombinedFileInputFormat? It sound very interesting to me and it would probably 
save me several hours of my pipeline computation. Merging of the files is 
currently the bottleneck in my system.
__


Another potential option could be to use Hadoop CombinedFileInputFormat with 
input split size of say 512 MB or 1 GB. That way you don't need to have a 
preceding step and I/O of first combining the files together.
On Nov 5, 2014 8:23 AM, jan.zi...@centrum.cz jan.zi...@centrum.cz wrote:
Ok so the problem was solved, it that the file was gziped and it looks that 
Spark does not support direct .gz file distribution to workers. 

Thank you very much fro the suggestion to merge the files.

Best regards,
Jan 
__


I have tried it out to merge the file to one, Spark is now working with RAM as 
I've expected.

Unfortunately after doing this there appears another problem. Now Spark running 
on YARN is scheduling all the work only to one worker node as a one big job. Is 
there some way, how to force Spark and Yarn to schedule all the work uniformly 
across the whole cluster?
 
I am running job from the following command:
./spark/bin/spark-submit --master yarn-client --py-files 
/home/hadoop/my_pavkage.zip  /home/hadoop/preprocessor.py 
 
I have also tried to play with options --num-executors and --executor-cores. 
But unfortunately I am not able to force Spark to run jobs on more than just 
one cluster node.
 
Thank you in advance for any advice,
Best regards,
Jan 
__
This is a crazy cases that has a few millions of files, the scheduler will run
out of memory. Be default, each file will become a partition, so you will
have more than 1M partitions, also 1M tasks. With coalesce(), it will reduce
the number of tasks, but can not reduce the number of partitions of original
RDD.

Could you pack the small files int bigger ones? Spark works much better than
small files.

On Mon, Nov 3, 2014 at 11:46 AM,  jan.zi...@centrum.cz jan.zi...@centrum.cz 
wrote:

I have 3 datasets in all the datasets the average file size is 10-12Kb.
I am able to run my code on the dataset with 70K files, but I am not able to
run it on datasets with 1.1M and 3.8M files.

__

On Sun, Nov 2, 2014 at 1:35 AM,  jan.zi...@centrum.cz jan.zi...@centrum.cz 
wrote:

Hi,

I am using Spark on Yarn, particularly Spark in Python. I am trying to
run:

myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json)


How many files do you have? and the average size of each file?


myrdd.getNumPartitions()

Unfortunately it seems that Spark tries to load everything to RAM, or at
least after while of running this everything slows down and then I am
getting errors with log below. Everything works fine for datasets smaller
than RAM, but I would expect Spark doing this without storing everything
to
RAM. So I would like to ask if I'm not missing some settings in Spark on
Yarn?


Thank you in advance for any help.


14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs]

11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs]

11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs]

11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs]

11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs]

Traceback (most recent call last):

  File stdin, line 1, in module

  File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in
getNumPartitions

    return self._jrdd.partitions().size()

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py,
line
300, in get_return_value

py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2

: An error occurred while calling o112.partitions.

: java.lang.OutOfMemoryError: GC overhead limit exceeded




11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs]


14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.

14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]


Re: Spark on Yarn probably trying to load all the data to RAM

2014-11-03 Thread jan.zikes

I have 3 datasets in all the datasets the average file size is 10-12Kb. 
I am able to run my code on the dataset with 70K files, but I am not able to 
run it on datasets with 1.1M and 3.8M files. 
__


On Sun, Nov 2, 2014 at 1:35 AM,  jan.zi...@centrum.cz wrote:

Hi,

I am using Spark on Yarn, particularly Spark in Python. I am trying to run:

myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json)


How many files do you have? and the average size of each file?


myrdd.getNumPartitions()

Unfortunately it seems that Spark tries to load everything to RAM, or at
least after while of running this everything slows down and then I am
getting errors with log below. Everything works fine for datasets smaller
than RAM, but I would expect Spark doing this without storing everything to
RAM. So I would like to ask if I'm not missing some settings in Spark on
Yarn?


Thank you in advance for any help.


14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-375] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs]

11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs]

11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs]

11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs]

11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs]

Traceback (most recent call last):

  File stdin, line 1, in module

  File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in
getNumPartitions

    return self._jrdd.partitions().size()

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__

  File
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
300, in get_return_value

py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler:
Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2

: An error occurred while calling o112.partitions.

: java.lang.OutOfMemoryError: GC overhead limit exceeded




11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs]


14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Shutting down remote daemon.

14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-381] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from
thread [sparkDriver-akka.actor.default-dispatcher-309] shutting down
ActorSystem [sparkDriver]

java.lang.OutOfMemoryError: GC overhead limit exceeded

14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Remote daemon shut down; proceeding with flushing remote transports.

14/11/01 22:07:09 INFO Remoting: Remoting shut down

14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator:
Remoting shut down.

14/11/01 22:07:09 INFO network.ConnectionManager: Removing
ReceivingConnection to
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)

14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ?
sun.nio.ch.SelectionKeyImpl@5ca1c790

14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ?
sun.nio.ch.SelectionKeyImpl@5ca1c790

java.nio.channels.CancelledKeyException

at
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)

at
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)

14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection
to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)

14/11/01 22:07:09 INFO network.ConnectionManager: Removing
ReceivingConnection to
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)

14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding
SendingConnection to
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not
found

14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application
already ended: FINISHED

14/11/01 22:07:10 INFO handler.ContextHandler: stopped
o.e.j.s.ServletContextHandler{/metrics/json,null}

14/11/01 22:07:10 INFO handler.ContextHandler: stopped
o.e.j.s.ServletContextHandler{/stages/stage/kill,null}

14/11/01 22:07:10 

Spark on Yarn probably trying to load all the data to RAM

2014-11-02 Thread jan.zikes

Hi,

I am using Spark on Yarn, particularly Spark in Python. I am trying to run:

myrdd = sc.textFile(s3n://mybucket/files/*/*/*.json)
myrdd.getNumPartitions()

Unfortunately it seems that Spark tries to load everything to RAM, or at least 
after while of running this everything slows down and then I am getting errors 
with log below. Everything works fine for datasets smaller than RAM, but I 
would expect Spark doing this without storing everything to RAM. So I would 
like to ask if I'm not missing some settings in Spark on Yarn?


Thank you in advance for any help.


14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.actor.default-dispatcher-375] shutting down ActorSystem 
[sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
14/11/01 22:06:57 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem 
[sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
11744,575: [Full GC 1194515K-1192839K(1365504K), 2,2367150 secs]
11746,814: [Full GC 1194507K-1193186K(1365504K), 2,1788150 secs]
11748,995: [Full GC 1194507K-1193278K(1365504K), 1,3511480 secs]
11750,347: [Full GC 1194507K-1193263K(1365504K), 2,2735350 secs]
11752,622: [Full GC 1194506K-1193192K(1365504K), 1,2700110 secs]
Traceback (most recent call last):
  File stdin, line 1, in module
  File /home/hadoop/spark/python/pyspark/rdd.py, line 391, in getNumPartitions
    return self._jrdd.partitions().size()
  File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__
  File /home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, 
line 300, in get_return_value
py4j.protocol.Py4JJavaError14/11/01 22:07:07 INFO scheduler.DAGScheduler: 
Failed to run saveAsTextFile at NativeMethodAccessorImpl.java:-2
: An error occurred while calling o112.partitions.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
 

11753,896: [Full GC 1194506K-947839K(1365504K), 2,1483780 secs]

14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Shutting down remote daemon.
14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.actor.default-dispatcher-381] shutting down ActorSystem 
[sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
14/11/01 22:07:09 ERROR actor.ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.actor.default-dispatcher-309] shutting down ActorSystem 
[sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote 
daemon shut down; proceeding with flushing remote transports.
14/11/01 22:07:09 INFO Remoting: Remoting shut down
14/11/01 22:07:09 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
Remoting shut down.
14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection 
to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,55871)
14/11/01 22:07:09 INFO network.ConnectionManager: Key not valid ? 
sun.nio.ch.SelectionKeyImpl@5ca1c790
14/11/01 22:07:09 INFO network.ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@5ca1c790
java.nio.channels.CancelledKeyException
at org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing SendingConnection to 
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
14/11/01 22:07:09 INFO network.ConnectionManager: Removing ReceivingConnection 
to ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768)
14/11/01 22:07:09 ERROR network.ConnectionManager: Corresponding 
SendingConnection to 
ConnectionManagerId(ip-172-31-18-35.us-west-2.compute.internal,52768) not found
14/11/01 22:07:10 ERROR cluster.YarnClientSchedulerBackend: Yarn application 
already ended: FINISHED
14/11/01 22:07:10 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/metrics/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/stages/stage/kill,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/static,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/executors/json,null}
14/11/01 22:07:10 INFO handler.ContextHandler: stopped 
o.e.j.s.ServletContextHandler{/executors,null}
14/11/01 

Re: Spark speed performance

2014-11-02 Thread jan.zikes

Thank you, I would expect it to work as you write, but I am probably 
experiencing it working other way. But now it seems that Spark is generally 
trying to fit everything to RAM. I run Spark on YARN and I have wraped this to 
another question: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-on-Yarn-probably-trying-to-load-all-the-data-to-RAM-td17908.html
__


coalesce() is a streaming operation if used without the second parameter, it 
does not put all the data in RAM. If used with the second parameter (shuffle = 
true), then it performs a shuffle, but still does not put all the data in RAM.
On Sat, Nov 1, 2014 at 12:09 PM, jan.zi...@centrum.cz jan.zi...@centrum.cz 
wrote:
Now I am getting to problems using:

distData = sc.textFile(sys.argv[2]).coalesce(10)
 
The problem is that it seems that Spark is trying to put all the data to RAM 
first and then perform coalesce. Do you know if there is something that would 
do coalesce on fly with for example fixed size of the partition? Do you think 
that something like this is possible? Unfortunately I am not able to find 
anything like this in the Spark documentation.

Thank you in advance for any advices or suggestions.

Best regards,
Jan 
__



Thank you very much lot of very small json files was exactly the speed 
performance problem, using coalesce makes my Spark program to run on single 
node only twice slower (even with starting Spark) than single node Python 
program, which is acceptable.

Jan 
__

Because the overhead between JVM and Python, single task will be
slower than your local Python scripts, but it's very easy to scale to
many CPUs.

Even one CPUs, it's not common that PySpark was 100 times slower. You
have many small files, each file will be processed by a task, which
will have about 100ms overhead (scheduled and executed), but the small
file can be processed in your single thread Python script in less than
1ms.

You could pack your json files into larger ones, or you could try to
merge the small tasks into larger one by coalesce(N), such as:

distData = sc.textFile(sys.argv[2]).coalesce(10)  # which will have 10
partitons (tasks)

Davies

On Sat, Oct 18, 2014 at 12:07 PM,  jan.zi...@centrum.cz 
jan.zi...@centrum.cz wrote:

Hi,

I have program that I have for single computer (in Python) exection and also
implemented the same for Spark. This program basically only reads .json from
which it takes one field and saves it back. Using Spark my program runs
aproximately 100 times slower on 1 master and 1 slave. So I would like to
ask where possibly might be the problem?

My Spark program looks like:



sc = SparkContext(appName=Json data preprocessor)

distData = sc.textFile(sys.argv[2])

json_extractor = JsonExtractor(sys.argv[1])

cleanedData = distData.flatMap(json_extractor.extract_json)

cleanedData.saveAsTextFile(sys.argv[3])

JsonExtractor only selects the data from field that is given by sys.argv[1].



My data are basically many small one json files, where is one json per line.

I have tried both, reading and writing the data from/to Amazon S3, local
disc on all the machines.

I would like to ask if there is something that I am missing or if Spark is
supposed to be so slow in comparison with the local non parallelized single
node program.



Thank you in advance for any suggestions or hints.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark speed performance

2014-11-01 Thread jan.zikes

Now I am getting to problems using:

distData = sc.textFile(sys.argv[2]).coalesce(10)
 
The problem is that it seems that Spark is trying to put all the data to RAM 
first and then perform coalesce. Do you know if there is something that would 
do coalesce on fly with for example fixed size of the partition? Do you think 
that something like this is possible? Unfortunately I am not able to find 
anything like this in the Spark documentation.

Thank you in advance for any advices or suggestions.

Best regards,
Jan 
__



Thank you very much lot of very small json files was exactly the speed 
performance problem, using coalesce makes my Spark program to run on single 
node only twice slower (even with starting Spark) than single node Python 
program, which is acceptable.

Jan 
__

Because the overhead between JVM and Python, single task will be
slower than your local Python scripts, but it's very easy to scale to
many CPUs.

Even one CPUs, it's not common that PySpark was 100 times slower. You
have many small files, each file will be processed by a task, which
will have about 100ms overhead (scheduled and executed), but the small
file can be processed in your single thread Python script in less than
1ms.

You could pack your json files into larger ones, or you could try to
merge the small tasks into larger one by coalesce(N), such as:

distData = sc.textFile(sys.argv[2]).coalesce(10)  # which will have 10
partitons (tasks)

Davies

On Sat, Oct 18, 2014 at 12:07 PM,  jan.zi...@centrum.cz wrote:

Hi,

I have program that I have for single computer (in Python) exection and also
implemented the same for Spark. This program basically only reads .json from
which it takes one field and saves it back. Using Spark my program runs
aproximately 100 times slower on 1 master and 1 slave. So I would like to
ask where possibly might be the problem?

My Spark program looks like:



sc = SparkContext(appName=Json data preprocessor)

distData = sc.textFile(sys.argv[2])

json_extractor = JsonExtractor(sys.argv[1])

cleanedData = distData.flatMap(json_extractor.extract_json)

cleanedData.saveAsTextFile(sys.argv[3])

JsonExtractor only selects the data from field that is given by sys.argv[1].



My data are basically many small one json files, where is one json per line.

I have tried both, reading and writing the data from/to Amazon S3, local
disc on all the machines.

I would like to ask if there is something that I am missing or if Spark is
supposed to be so slow in comparison with the local non parallelized single
node program.



Thank you in advance for any suggestions or hints.



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Repartitioning by partition size, not by number of partitions.

2014-10-31 Thread jan.zikes

Hi,

I have inpot data that are many of very small files containing one .json. 


For performance reasons (I use PySpark) I have to do repartioning, currently I 
do:

sc.textFile(files).coalesce(100))
 
Problem is that I have to guess the number of partitions in a such way that 
it's as fast as possible and I am still on the sefe side with the RAM memory. 
So this is quiet difficult.

For this reason I would like to ask if there is some way, how to replace 
coalesce(100) by something that creates N partitions of the given size? I went 
through the documentation, but I was not able to find some way, how to do that.

thank you in advance for any help or advice. 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

RE: Repartitioning by partition size, not by number of partitions.

2014-10-31 Thread jan.zikes

Hi Ilya,

This seems to me as quiet complicated solution, I'm thinking that easier 
(though not optimal) solution might be for example to use heuristicaly 
something like RDD.coalesce(RDD.getNumPartitions() / N), but it keeps me wonder 
that Spark does not have something like RDD.coalesce(partition_size).
 
__


Hi Jan. I've actually written a function recently to do precisely that using 
the RDD.randomSplit function. You just need to calculate how big each element 
of your data is, then how many of each data can fit in each RDD to populate the 
input to rqndomSplit. Unfortunately, in my case I wind up with GC errors on 
large data doing this and am still debugging :)

-Original Message-
From: jan.zi...@centrum.cz [jan.zi...@centrum.cz jan.zi...@centrum.cz]
Sent: Friday, October 31, 2014 06:27 AM Eastern Standard Time
To: user@spark.apache.org
Subject: Repartitioning by partition size, not by number of partitions.


Hi,

I have inpot data that are many of very small files containing one .json.

For performance reasons (I use PySpark) I have to do repartioning, currently I 
do:

sc.textFile(files).coalesce(100))
 
Problem is that I have to guess the number of partitions in a such way that 
it's as fast as possible and I am still on the sefe side with the RAM memory. 
So this is quiet difficult.

For this reason I would like to ask if there is some way, how to replace 
coalesce(100) by something that creates N partitions of the given size? I went 
through the documentation, but I was not able to find some way, how to do that.

thank you in advance for any help or advice. 
 
 
The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: How to set Spark to perform only one map at once at each cluster node

2014-10-31 Thread jan.zikes

Yes I would expect it as you say, setting executor-cores as 1 would work, but 
it seems to me that when I do use executor-cores=1 than it does actually 
perform more than one job on each of the machines at one time moment (at least 
based on what top says).
__



CC: user@spark.apache.org

It's not very difficult to implement by properly set parameter of 
application.Some basic knowledge you should know: An application can have only 
one executor at each machine or container (YARN).So you just set executor-cores 
as 1, then each executor will make only one task at once. 
2014-10-28 19:00 GMT+08:00 jan.zi...@centrum.cz jan.zi...@centrum.cz:
But I guess that this makes only one task over all the clusters nodes. I would 
like to run several tasks, but I would like Spark to not run more than one map 
at each of my nodes at one time. That means I would like to let's say have 4 
different tasks and 2 nodes where each node has 2 cores. Currently hadoop runs 
2 maps in parallel at each node (all the 4 tasks in parallel), but I would like 
to somehow force it to run only 1 task at each node and to give it another task 
after the first task will finish.  
 
__


The number of tasks is decided by the input partition numbers.If you want only 
one map or flatMap at once, just call coalesce() or repartition() to associate 
data into one partition.However, this is not recommend because it was not 
executed parallel efficiently.
2014-10-28 17:27 GMT+08:00 jan.zi...@centrum.cz jan.zi...@centrum.cz:
Hi,

I am currently struggling with how to properly set Spark to perform only one 
map, flatMap, etc at once. In other words my map uses multi core algorithm so I 
would like to have only one map running to be able to use all the machine cores.

Thank you in advance for advices and replies. 

Jan 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

How to set Spark to perform only one map at once at each cluster node

2014-10-28 Thread jan.zikes
Hi,

I am currently struggling with how to properly set Spark to perform only one 
map, flatMap, etc at once. In other words my map uses multi core algorithm so I 
would like to have only one map running to be able to use all the machine cores.

Thank you in advance for advices and replies. 

Jan 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set Spark to perform only one map at once at each cluster node

2014-10-28 Thread jan.zikes

But I guess that this makes only one task over all the clusters nodes. I would 
like to run several tasks, but I would like Spark to not run more than one map 
at each of my nodes at one time. That means I would like to let's say have 4 
different tasks and 2 nodes where each node has 2 cores. Currently hadoop runs 
2 maps in parallel at each node (all the 4 tasks in parallel), but I would like 
to somehow force it to run only 1 task at each node and to give it another task 
after the first task will finish.  
 
__


The number of tasks is decided by the input partition numbers.If you want only 
one map or flatMap at once, just call coalesce() or repartition() to associate 
data into one partition.However, this is not recommend because it was not 
executed parallel efficiently.
2014-10-28 17:27 GMT+08:00 jan.zi...@centrum.cz jan.zi...@centrum.cz:
Hi,

I am currently struggling with how to properly set Spark to perform only one 
map, flatMap, etc at once. In other words my map uses multi core algorithm so I 
would like to have only one map running to be able to use all the machine cores.

Thank you in advance for advices and replies. 

Jan 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org 
user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: PySpark problem with textblob from NLTK used in map

2014-10-27 Thread jan.zikes

So the problem was that Spark has internaly set home to /home. Hack to make 
this work with Python is to add before call of textblob line:
os.environ['HOME'] = '/home/hadoop' 
__


Maybe I'll add one more question. I think that the problem is with user, so I 
would like to ask under which user are run Spark jobs on slaves?
__


Hi,

I am trying to implement function for text preprocessing in PySpark. I have amazon EMR 
where I am installing Python dependencies from the bootstrap script. One of these 
dependencies is textblob python -m textblob.download_corpora. Then I am 
trying to use it locally on all the machines without any problem.

But when I am trying to run it from Spark then I am getting following error:


INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1324, in 
saveAsTextFile
INFO: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
INFO: File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__
INFO: File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value
INFO: py4j.protocol.Py4JJavaError: An error occurred while calling 
o54.saveAsTextFile.
INFO: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
8 in stage 1.0 failed 4 times, most recent failure: Lost task 8.3 in stage 1.0 
(TID 40, ip-172-31-3-125.ec2.internal): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
INFO: File /home/hadoop/spark/python/pyspark/worker.py, line 79, in main
INFO: serializer.dump_stream(func(split_index, iterator), outfile)
INFO: File /home/hadoop/spark/python/pyspark/serializers.py, line 127, in 
dump_stream
INFO: for obj in iterator:
INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1316, in func
INFO: for x in iterator:
INFO: File 
/home/hadoop/pyckage/package_topics/package_topics/preprocessor.py, line 40, 
in make_tokens
INFO: File ./package_topics.zip/package_topics/data_utils.py, line 76, in 
preprocess_text
INFO: for noun_phrase in TextBlob(' '.join(tokens)).noun_phrases
INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 24, 
in __get__
INFO: value = obj.__dict__[self.func.__name__] = self.func(obj)
INFO: File /usr/lib/python2.6/site-packages/textblob/blob.py, line 431, in 
noun_phrases
INFO: for phrase in self.np_extractor.extract(self.raw)
INFO: File /usr/lib/python2.6/site-packages/textblob/en/np_extractors.py, 
line 138, in extract
INFO: self.train()
INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 38, 
in decorated
INFO: raise MissingCorpusError()
INFO: MissingCorpusError:
INFO: Looks like you are missing some required data for this feature.
INFO: 
INFO: To download the necessary data, simply run
INFO: 
INFO: python -m textblob.download_corpora
INFO: 
INFO: or use the NLTK downloader to download the missing data: 
http://nltk.org/data.html
INFO: If this doesn't fix the problem, file an issue at 
https://github.com/sloria/TextBlob/issues.
INFO: 
INFO: 
INFO: org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
INFO: org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:154)
INFO: org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
INFO: org.apache.spark.scheduler.Task.run(Task.scala:54)
INFO: org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
INFO: 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
INFO: 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
INFO: java.lang.Thread.run(Thread.java:745)
INFO: Driver stacktrace:
INFO: at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
INFO: at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
INFO: at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
INFO: at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
INFO: at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
INFO: at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
INFO: at 

Re: PySpark problem with textblob from NLTK used in map

2014-10-24 Thread jan.zikes

Maybe I'll add one more question. I think that the problem is with user, so I 
would like to ask under which user are run Spark jobs on slaves?
__


Hi,

I am trying to implement function for text preprocessing in PySpark. I have amazon EMR 
where I am installing Python dependencies from the bootstrap script. One of these 
dependencies is textblob python -m textblob.download_corpora. Then I am 
trying to use it locally on all the machines without any problem.

But when I am trying to run it from Spark then I am getting following error:


INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1324, in 
saveAsTextFile
INFO: keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
INFO: File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__
INFO: File 
/home/hadoop/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value
INFO: py4j.protocol.Py4JJavaError: An error occurred while calling 
o54.saveAsTextFile.
INFO: : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
8 in stage 1.0 failed 4 times, most recent failure: Lost task 8.3 in stage 1.0 
(TID 40, ip-172-31-3-125.ec2.internal): 
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
INFO: File /home/hadoop/spark/python/pyspark/worker.py, line 79, in main
INFO: serializer.dump_stream(func(split_index, iterator), outfile)
INFO: File /home/hadoop/spark/python/pyspark/serializers.py, line 127, in 
dump_stream
INFO: for obj in iterator:
INFO: File /home/hadoop/spark/python/pyspark/rdd.py, line 1316, in func
INFO: for x in iterator:
INFO: File 
/home/hadoop/pyckage/package_topics/package_topics/preprocessor.py, line 40, 
in make_tokens
INFO: File ./package_topics.zip/package_topics/data_utils.py, line 76, in 
preprocess_text
INFO: for noun_phrase in TextBlob(' '.join(tokens)).noun_phrases
INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 24, 
in __get__
INFO: value = obj.__dict__[self.func.__name__] = self.func(obj)
INFO: File /usr/lib/python2.6/site-packages/textblob/blob.py, line 431, in 
noun_phrases
INFO: for phrase in self.np_extractor.extract(self.raw)
INFO: File /usr/lib/python2.6/site-packages/textblob/en/np_extractors.py, 
line 138, in extract
INFO: self.train()
INFO: File /usr/lib/python2.6/site-packages/textblob/decorators.py, line 38, 
in decorated
INFO: raise MissingCorpusError()
INFO: MissingCorpusError:
INFO: Looks like you are missing some required data for this feature.
INFO: 
INFO: To download the necessary data, simply run
INFO: 
INFO: python -m textblob.download_corpora
INFO: 
INFO: or use the NLTK downloader to download the missing data: 
http://nltk.org/data.html
INFO: If this doesn't fix the problem, file an issue at 
https://github.com/sloria/TextBlob/issues.
INFO: 
INFO: 
INFO: org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:124)
INFO: org.apache.spark.api.python.PythonRDD$$anon$1.init(PythonRDD.scala:154)
INFO: org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:87)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
INFO: org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
INFO: org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
INFO: org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
INFO: org.apache.spark.scheduler.Task.run(Task.scala:54)
INFO: org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
INFO: 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
INFO: 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
INFO: java.lang.Thread.run(Thread.java:745)
INFO: Driver stacktrace:
INFO: at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
INFO: at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
INFO: at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
INFO: at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
INFO: at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
INFO: at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
INFO: at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
INFO: at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
INFO: at scala.Option.foreach(Option.scala:236)
INFO: at 

Under which user is the program run on slaves?

2014-10-24 Thread jan.zikes
Hi,

I would like to ask under which user is run the Spark program on slaves? My 
Spark is running on top of the Yarn.

The reason I am asking for this is that I need to download data for NLTK 
library and these data are dowloaded for specific python user and I am 
currently struggling with this. 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-problem-with-textblob-from-NLTK-used-in-map-td17211.html

Than you in advance for any ideas. 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark speed performance

2014-10-19 Thread jan.zikes
Thank you very much lot of very small json files was exactly the speed 
performance problem, using coalesce makes my Spark program to run on single 
node only twice slower (even with starting Spark) than single node Python 
program, which is acceptable.

Jan 
__

Because the overhead between JVM and Python, single task will be
slower than your local Python scripts, but it's very easy to scale to
many CPUs.

Even one CPUs, it's not common that PySpark was 100 times slower. You
have many small files, each file will be processed by a task, which
will have about 100ms overhead (scheduled and executed), but the small
file can be processed in your single thread Python script in less than
1ms.

You could pack your json files into larger ones, or you could try to
merge the small tasks into larger one by coalesce(N), such as:

distData = sc.textFile(sys.argv[2]).coalesce(10)  # which will have 10
partitons (tasks)

Davies

On Sat, Oct 18, 2014 at 12:07 PM,  jan.zi...@centrum.cz wrote:
 Hi,

 I have program that I have for single computer (in Python) exection and also
 implemented the same for Spark. This program basically only reads .json from
 which it takes one field and saves it back. Using Spark my program runs
 aproximately 100 times slower on 1 master and 1 slave. So I would like to
 ask where possibly might be the problem?

 My Spark program looks like:



 sc = SparkContext(appName=Json data preprocessor)

 distData = sc.textFile(sys.argv[2])

 json_extractor = JsonExtractor(sys.argv[1])

 cleanedData = distData.flatMap(json_extractor.extract_json)

 cleanedData.saveAsTextFile(sys.argv[3])

 JsonExtractor only selects the data from field that is given by sys.argv[1].



 My data are basically many small one json files, where is one json per line.

 I have tried both, reading and writing the data from/to Amazon S3, local
 disc on all the machines.

 I would like to ask if there is something that I am missing or if Spark is
 supposed to be so slow in comparison with the local non parallelized single
 node program.



 Thank you in advance for any suggestions or hints.



 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark speed performance

2014-10-18 Thread jan.zikes

Hi,

I have program that I have for single computer (in Python) exection and also 
implemented the same for Spark. This program basically only reads .json from 
which it takes one field and saves it back. Using Spark my program runs 
aproximately 100 times slower on 1 master and 1 slave. So I would like to ask 
where possibly might be the problem?

My Spark program looks like:
 
sc = SparkContext(appName=Json data preprocessor)
distData = sc.textFile(sys.argv[2])
json_extractor = JsonExtractor(sys.argv[1])
cleanedData = distData.flatMap(json_extractor.extract_json) 
cleanedData.saveAsTextFile(sys.argv[3])

JsonExtractor only selects the data from field that is given by sys.argv[1].
 
My data are basically many small one json files, where is one json per line.

I have tried both, reading and writing the data from/to Amazon S3, local disc 
on all the machines.

I would like to ask if there is something that I am missing or if Spark is 
supposed to be so slow in comparison with the local non parallelized single 
node program. 
 
Thank you in advance for any suggestions or hints.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: How does reading the data from Amazon S3 works?

2014-10-17 Thread jan.zikes

Hi, 

I have seen in the video from Spark summit that usually (when I use HDFS) are 
data distributed across the whole cluster and usually computations goes to the 
data.

My question is how does it work when I read the data from Amazon S3? Is the 
whole input dataset readed by the master node and then distributed to the slave 
nodes? Or does master node only determine which slave should read what and then 
the reading is performed independently by each of the slaves? 

Thank you in advance for the clarification. 
 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

How to assure that there will be run only one map per cluster node?

2014-10-17 Thread jan.zikes
hi,

I have cluster that has several nodes and every node has several cores. I'd 
like to run multi-core algorithm within every map. So I'd like to assure that 
there will be performed only one map per cluster node. Is there some way, how 
to assure this? It seems to me that it should be possible by spark.task.cpus as 
it is described at https://spark.apache.org/docs/latest/configuration.html, but 
it's not clear to me if the value is total number of CPUs per cluster or CPUs 
per cluster node?

Thank you in advance for any help and suggestions. 
 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:

2014-10-09 Thread jan.zikes

I've tried to add / at the end of the path, but the result was exactly the 
same. I also guess that there will be some problem on the level of Hadoop - S3 
comunication. Doy you know if there is some possibility of how tu run scripts 
from Spark on for example different hadoom version from the standard EC2 
installation?
__

Od: Sean Owen so...@cloudera.com
Komu: jan.zi...@centrum.cz
Datum: 08.10.2014 18:05
Předmět: Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File 
does not exist:

CC: user@spark.apache.org

Take this as a bit of a guess, since I don't use S3 much and am only a
bit aware of the Hadoop+S3 integration issues. But I know that S3's
lack of proper directories causes a few issues when used with Hadoop,
which wants to list directories.

According to 
http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/s3native/NativeS3FileSystem.html
 
http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/s3native/NativeS3FileSystem.html
... I wonder if you simply need to end the path with / to make it
clear you mean it as a directory. Hadoop S3 OutputFormats are going to
append ..._$folder$ files to mark directories too, although I don't
think it's required necessarily to read them as dirs.

I still imagine there could be some problem between Hadoop in Spark in
this regard, but worth trying the path thing first. You do need s3n://
for sure.

On Wed, Oct 8, 2014 at 4:54 PM,  jan.zi...@centrum.cz wrote:

One more update: I've realized that this problem is not only Python related.
I've tried it also in Scala, but I'm still getting the same error, my scala
code: val file = sc.wholeTextFiles(s3n://wiki-dump/wikiinput).first()

__


My additional question is if this problem can be possibly caused by the fact
that my file is bigger than RAM memory across the whole cluster?



__

Hi

I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm
getting following Error:



14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process :
1

14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process :
1

Traceback (most recent call last):

  File /root/distributed_rdd_test.py, line 27, in module

    result =
distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)

  File /root/spark/python/pyspark/rdd.py, line 1126, in take

    totalParts = self._jrdd.partitions().size()

  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py,
line 538, in __call__

  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line
300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions.

: java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz

at
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)

at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489)

at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)

at
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)

at
org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)

at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)

at scala.Option.getOrElse(Option.scala:120)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)

at
org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)

at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)

at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)

at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

at py4j.commands.CallCommand.execute(CallCommand.java:79)

at py4j.GatewayConnection.run(GatewayConnection.java:207)



at java.lang.Thread.run(Thread.java:745)



My code is following:



sc = SparkContext(appName=Process wiki)

distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput')

result = 

Re: Parsing one big multiple line .xml loaded in RDD using Python

2014-10-08 Thread jan.zikes

Thank you, this seems to be the way to go, but unfortunately, when I'm trying 
to use sc.wholeTextFiles() on file that is stored amazon S3 I'm getting 
following Error:
 
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
Traceback (most recent call last):
  File /root/distributed_rdd_test.py, line 27, in module
    result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
  File /root/spark/python/pyspark/rdd.py, line 1126, in take
    totalParts = self._jrdd.partitions().size()
  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__
  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions.
: java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)
at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
 
at java.lang.Thread.run(Thread.java:745)
 
My code is following:
 
sc = SparkContext(appName=Process wiki)
distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') 
result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
for item in result:
        print item.getvalue()
sc.stop()
 
So my question is is it possible to read whole files from S3? Based on the 
documentation it shouold be possible, but it seems that it does not work for me.
 
__

Od: Davies Liu dav...@databricks.com
Komu: jan.zi...@centrum.cz
Datum: 07.10.2014 17:38
Předmět: Re: Parsing one big multiple line .xml loaded in RDD using Python

CC: u...@spark.incubator.apache.org

Maybe sc.wholeTextFile() is what you want, you can get the whole text
and parse it by yourself.

On Tue, Oct 7, 2014 at 1:06 AM,  jan.zi...@centrum.cz wrote:

Hi,

I have already unsucesfully asked quiet simmilar question at stackoverflow,
particularly here:
http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim
 
http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim.
I've also unsucessfully tryied some workaround, but unsucessfuly, workaround
problem can be found at
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html
 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html.

Particularly what I'm trying to do, I have .xml dump of wikipedia as the
input. The .xml is quite big and it spreads across multiple lines. You can
check it out at
http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2 
http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2.

My goal is to parse this .xml in a same way as
gensim.corpora.wikicorpus.extract_pages do, implementation is at
https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py 

Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:

2014-10-08 Thread jan.zikes

My additional question is if this problem can be possibly caused by the fact 
that my file is bigger than RAM memory across the whole cluster?
 
__
Hi
I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm 
getting following Error:
 
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
Traceback (most recent call last):
  File /root/distributed_rdd_test.py, line 27, in module
    result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
  File /root/spark/python/pyspark/rdd.py, line 1126, in take
    totalParts = self._jrdd.partitions().size()
  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__
  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions.
: java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)
at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
 
at java.lang.Thread.run(Thread.java:745)
 
My code is following:
 
sc = SparkContext(appName=Process wiki)
distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') 
result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
for item in result:
        print item.getvalue()
sc.stop()
 
So my question is, is it possible to read whole files from S3? Based on the 
documentation it shouold be possible, but it seems that it does not work for me.
 
When I do just:
 
sc = SparkContext(appName=Process wiki)
distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10)
print distData
 
Then the error that I'm getting is exactly the same.
 
Thank you in advance for any advice.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: SparkContext.wholeTextFiles() java.io.FileNotFoundException: File does not exist:

2014-10-08 Thread jan.zikes

One more update: I've realized that this problem is not only Python related. I've tried 
it also in Scala, but I'm still getting the same error, my scala code: val file = 
sc.wholeTextFiles(s3n://wiki-dump/wikiinput).first()
__
My additional question is if this problem can be possibly caused by the fact 
that my file is bigger than RAM memory across the whole cluster?
 
__
Hi
I'm trying to use sc.wholeTextFiles() on file that is stored amazon S3 I'm 
getting following Error:
 
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
14/10/08 06:09:50 INFO input.FileInputFormat: Total input paths to process : 1
Traceback (most recent call last):
  File /root/distributed_rdd_test.py, line 27, in module
    result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
  File /root/spark/python/pyspark/rdd.py, line 1126, in take
    totalParts = self._jrdd.partitions().size()
  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 
538, in __call__
  File /root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.partitions.
: java.io.FileNotFoundException: File does not exist: /wikiinput/wiki.xml.gz
at 
org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.init(CombineFileInputFormat.java:489)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280)
at 
org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:240)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(NewHadoopRDD.scala:220)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:56)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:50)
at org.apache.spark.api.java.JavaRDD.partitions(JavaRDD.scala:32)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
 
at java.lang.Thread.run(Thread.java:745)
 
My code is following:
 
sc = SparkContext(appName=Process wiki)
distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput') 
result = distData.flatMap(gensim.corpora.wikicorpus.extract_pages).take(10)
for item in result:
        print item.getvalue()
sc.stop()
 
So my question is, is it possible to read whole files from S3? Based on the 
documentation it shouold be possible, but it seems that it does not work for me.
 
When I do just:
 
sc = SparkContext(appName=Process wiki)
distData = sc.wholeTextFiles('s3n://wiki-dump/wikiinput').take(10)
print distData
 
Then the error that I'm getting is exactly the same.
 
Thank you in advance for any advice.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Parsing one big multiple line .xml loaded in RDD using Python

2014-10-07 Thread jan.zikes

Hi,

I have already unsucesfully asked quiet simmilar question at stackoverflow, 
particularly here: 
http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim.
 I've also unsucessfully tryied some workaround, but unsucessfuly, workaround 
problem can be found at 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-Python-using-generator-of-data-bigger-than-RAM-as-input-to-sc-parallelize-td15789.html.

Particularly what I'm trying to do, I have .xml dump of wikipedia as the input. 
The .xml is quite big and it spreads across multiple lines. You can check it 
out at 
http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles.xml.bz2. 
My goal is to parse this .xml in a same way as 
gensim.corpora.wikicorpus.extract_pages do, implementation is at 
https://github.com/piskvorky/gensim/blob/develop/gensim/corpora/wikicorpus.py. 
Unfortunately this method does not work, because RDD.flatMap() process the RDD 
line by line as strings.

Does anyone has some suggestion of how to possibly parse the wikipedia like 
.xml loaded in RDD using Python? 

Thank you in advance for any suggestions, advices or hints. 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Spark and Python using generator of data bigger than RAM as input to sc.parallelize()

2014-10-06 Thread jan.zikes

Hi,

I would like to ask if it is possible to use generator, that generates data 
bigger than size of RAM across all the machines as the input for sc = 
SparkContext(), sc.paralelize(generator). I would like to create RDD this way. 
When I am trying to create RDD by sc.TextFile(file) where file has even bigger 
size than data generated by the generator everything works fine, but 
unfortunately I need to use sc.parallelize(generator) and it makes my OS to 
kill the spark job.
 
I'm getting only this log and then the job is killed:
 
14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root,
14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root,
14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root, ); users 
with modify permissions: Set(root, )
14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/10/06 13:34:16 INFO Remoting: Starting remoting
14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]
14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]
14/10/06 13:34:17 INFO util.Utils: Successfully started service 'sparkDriver' 
on port 41016.
14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker
14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster
14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at 
/mnt/spark/spark-local-20141006133417-821e
14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection 
manager for block manager' on port 42438.
14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438 
with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438)
14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with capacity 
267.3 MB
14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register 
BlockManager
14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block 
manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM
14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager
14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is 
/tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523
14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server
14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/10/06 13:34:17 INFO server.AbstractConnector: Started 
SocketConnector@0.0.0.0:44768
14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file 
server' on port 44768.
14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/10/06 13:34:17 INFO server.AbstractConnector: Started 
SelectChannelConnector@0.0.0.0:4040
14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on 
port 4040.
14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at 
http://ec2-54-164-72-236.compute-1.amazonaws.com:4040
14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to 
/tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py
14/10/06 13:34:18 INFO spark.SparkContext: Added file 
file:/root/generator_test.py at 
http://172.31.25.197:44768/files/generator_test.py with timestamp 1412602458065
14/10/06 13:34:18 INFO client.AppClient$ClientActor: Connecting to master 
spark://ec2-54-164-72-236.compute-1.amazonaws.com:7077...
14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: SchedulerBackend is 
ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.0
14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark 
cluster with app ID app-20141006133418-0046
14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor added: 
app-20141006133418-0046/0 on 
worker-20141005074620-ip-172-31-30-40.ec2.internal-49979 
(ip-172-31-30-40.ec2.internal:49979) with 1 cores
14/10/06 13:34:18 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID 
app-20141006133418-0046/0 on hostPort ip-172-31-30-40.ec2.internal:49979 with 1 
cores, 512.0 MB RAM
14/10/06 13:34:18 INFO client.AppClient$ClientActor: Executor updated: 
app-20141006133418-0046/0 is now RUNNING
14/10/06 13:34:21 INFO cluster.SparkDeploySchedulerBackend: Registered 
executor: 
Actor[akka.tcp://sparkExecutor@ip-172-31-30-40.ec2.internal:50877/user/Executor#-1621441852]
 with ID 0
14/10/06 13:34:21 INFO storage.BlockManagerMasterActor: Registering block 
manager ip-172-31-30-40.ec2.internal:34460 with 267.3 MB RAM

Thank you in advance for any advice or sugestion. 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()

2014-10-06 Thread jan.zikes

Hi,

Thank you for your advice. It really might work, but to specify my problem a 
bit more, think of my data more like one generated item is one parsed wikipedia 
page. I am getting this generator from the parser and I don't want to save it 
to the storage, but directly apply parallelize and create RDD, based on your 
advice I'm now thinking that something like batching and creating several RDDs 
and then applying union on them might possibly be the way to go.

Originaly I was thinking of calling the parsing function in flatMap on the RDD 
loaded from the xml file, but then I unfortunately had this problem 
http://stackoverflow.com/questions/26202978/spark-and-python-trying-to-parse-wikipedia-using-gensim
 so now I am trying to parse the xml on the master node an directly put it to 
the RDD. 
__

Od: Davies Liu dav...@databricks.com
Komu: jan.zi...@centrum.cz
Datum: 06.10.2014 18:09
Předmět: Re: Spark and Python using generator of data bigger than RAM as input 
to sc.parallelize()


sc.parallelize() to distribute a list of data into numbers of partitions, but
generator can not be cut and serialized automatically.

If you can partition your generator, then you can try this:

sc.parallelize(range(N), N).flatMap(lambda x: generate_partiton(x))

such as you want to generate xrange(M), M is huge, so
sc.parallelize(range(N), N).flatMap(lambda x: xrange(M/N*x, M / N * (x+1))

On Mon, Oct 6, 2014 at 7:16 AM,  jan.zi...@centrum.cz wrote:

Hi,


I would like to ask if it is possible to use generator, that generates data
bigger than size of RAM across all the machines as the input for sc =
SparkContext(), sc.paralelize(generator). I would like to create RDD this
way. When I am trying to create RDD by sc.TextFile(file) where file has even
bigger size than data generated by the generator everything works fine, but
unfortunately I need to use sc.parallelize(generator) and it makes my OS to
kill the spark job.



I'm getting only this log and then the job is killed:



14/10/06 13:34:16 INFO spark.SecurityManager: Changing view acls to: root,

14/10/06 13:34:16 INFO spark.SecurityManager: Changing modify acls to: root,

14/10/06 13:34:16 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(root, ); users with modify permissions: Set(root, )

14/10/06 13:34:16 INFO slf4j.Slf4jLogger: Slf4jLogger started

14/10/06 13:34:16 INFO Remoting: Starting remoting

14/10/06 13:34:17 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]

14/10/06 13:34:17 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://sparkDriver@ip-172-31-25-197.ec2.internal:41016]

14/10/06 13:34:17 INFO util.Utils: Successfully started service
'sparkDriver' on port 41016.

14/10/06 13:34:17 INFO spark.SparkEnv: Registering MapOutputTracker

14/10/06 13:34:17 INFO spark.SparkEnv: Registering BlockManagerMaster

14/10/06 13:34:17 INFO storage.DiskBlockManager: Created local directory at
/mnt/spark/spark-local-20141006133417-821e

14/10/06 13:34:17 INFO util.Utils: Successfully started service 'Connection
manager for block manager' on port 42438.

14/10/06 13:34:17 INFO network.ConnectionManager: Bound socket to port 42438
with id = ConnectionManagerId(ip-172-31-25-197.ec2.internal,42438)

14/10/06 13:34:17 INFO storage.MemoryStore: MemoryStore started with
capacity 267.3 MB

14/10/06 13:34:17 INFO storage.BlockManagerMaster: Trying to register
BlockManager

14/10/06 13:34:17 INFO storage.BlockManagerMasterActor: Registering block
manager ip-172-31-25-197.ec2.internal:42438 with 267.3 MB RAM

14/10/06 13:34:17 INFO storage.BlockManagerMaster: Registered BlockManager

14/10/06 13:34:17 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-c4edda1c-0949-490d-8ff3-10993727c523

14/10/06 13:34:17 INFO spark.HttpServer: Starting HTTP Server

14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT

14/10/06 13:34:17 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:44768

14/10/06 13:34:17 INFO util.Utils: Successfully started service 'HTTP file
server' on port 44768.

14/10/06 13:34:17 INFO server.Server: jetty-8.y.z-SNAPSHOT

14/10/06 13:34:17 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040

14/10/06 13:34:17 INFO util.Utils: Successfully started service 'SparkUI' on
port 4040.

14/10/06 13:34:17 INFO ui.SparkUI: Started SparkUI at
http://ec2-54-164-72-236.compute-1.amazonaws.com:4040 
http://ec2-54-164-72-236.compute-1.amazonaws.com:4040

14/10/06 13:34:18 INFO util.Utils: Copying /root/generator_test.py to
/tmp/spark-0bafac0c-6779-4910-b095-0ede226fa3ce/generator_test.py

14/10/06 13:34:18 INFO spark.SparkContext: Added file
file:/root/generator_test.py at
http://172.31.25.197:44768/files/generator_test.py 
http://172.31.25.197:44768/files/generator_test.py with timestamp
1412602458065

14/10/06 13:34:18 

Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()

2014-10-06 Thread jan.zikes

@Davies
I know that gensim.corpora.wikicorpus.extract_pages will be for sure the bottle 
neck on the master node.
Unfortunately I am using Spark on EC2 and I don't have enough space on my nodes 
to store there whole data that needs to be parsed by extract_pages. I have my 
data on S3 and I kind of hoped that after reading (sc.textFile(file_on_s3)) the 
data from S3 to RDD it will be possible to pass the RDD to extract_pages, this 
unfortunately does not work for me. If it'd work it'd be by far the best way to 
go for me.
 
@Steve
I can try Hadoop Custom InputFormat. It'd be great if you could send me some 
samples. But if I understand it correctly then I'm afraid that it won't work 
for me, because I actually don't have any url to wikipedia, I have only file, 
that is opened, parsed and returned as generator that generates parsed pagename 
and text from wikipedia (it can be also some non public wikipedia like site)
__

Od: Steve Lewis lordjoe2...@gmail.com
Komu: Davies Liu dav...@databricks.com
Datum: 06.10.2014 22:39
Předmět: Re: Spark and Python using generator of data bigger than RAM as input 
to sc.parallelize()

CC: user

Try a Hadoop Custom InputFormat - I can give you some samples - While I have 
not tried this an input split has only a length (could be ignores if the format 
treats as non splittable) and a String for a location.If the location is a URL 
into wikipedia the whole thing should work.Hadoop InputFormats seem to be the 
best way to get large (say multi gigabyte files) into RDDs

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org