Re: EMR 4.3.0 spark 1.6 shell problem
Here is my command: aws emr create-cluster --release-label emr-4.3.0 --name "ClusterJava8" --use-default-roles --applications Name=Ganglia Name=Hive Name=Hue Name=Mahout Name=Pig Name=Spark --ec2-attributes KeyName=CC-ES-Demo --instance-count 3 --instance-type m3.xlarge --use-default-roles --bootstrap-action Path=s3://crayon-emr-scripts/emr_java_8.sh I am using bootstrap script to install java 8. When I choose applications (Name=Ganglia Name=Hive Name=Hue Name=Mahout Name=Pig Name=Spark) problem is gone. I fixed on the way Lzo not found exception. Now I have another problem that I have no idea why it happens: I tries to copy file to hdfs and got this exception (file is very small , just couple of kb). org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /input/test.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:632) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) at org.apache.hadoop.ipc.Client.call(Client.java:1476) at org.apache.hadoop.ipc.Client.call(Client.java:1407) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:238) at com.sun.proxy.$Proxy9.addBlock(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy10.addBlock(Unknown Source) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1441) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1237) at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:454) put: File /input/test.txt._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation. On Wed, Mar 2, 2016 at 4:09 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > Hi, > > which region are you using the EMR clusters from? Is there any tweaking of > the HADOOP parameters that you are doing before starting the clusters? > > If you are using AWS CLI to start the cluster just send across the command. > > I have, never till date, faced any such issues in the Ireland region. > > > Regards, > Gourav Sengupta > > On Tue, Mar 1, 2016 at 9:15 AM, Oleg Ruchovets <oruchov...@gmail.com> > wrote: > >> Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell >> but it looks it does't work and throws exceptions. >> Please advice: >> >> [hadoop@ip-172-31-39-37 conf]$ cd /usr/bin/ >> [hadoop@ip-172-31-39-37 bin]$ ./spark-shell >> OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M; >> support was removed in 8.0 >> 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop >> 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop >> 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication >> disabled; ui acls disabled; users with view permissions: Set(hadoop); users >> with modify permissions: Set(hadoop) >> 16/03/01 09:
EMR 4.3.0 spark 1.6 shell problem
Hi , I am installed EMR 4.3.0 with spark. I tries to enter spark shell but it looks it does't work and throws exceptions. Please advice: [hadoop@ip-172-31-39-37 conf]$ cd /usr/bin/ [hadoop@ip-172-31-39-37 bin]$ ./spark-shell OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=512M; support was removed in 8.0 16/03/01 09:11:48 INFO SecurityManager: Changing view acls to: hadoop 16/03/01 09:11:48 INFO SecurityManager: Changing modify acls to: hadoop 16/03/01 09:11:48 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 16/03/01 09:11:49 INFO HttpServer: Starting HTTP Server 16/03/01 09:11:49 INFO Utils: Successfully started service 'HTTP class server' on port 47223. Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.0 /_/ Using Scala version 2.10.5 (OpenJDK 64-Bit Server VM, Java 1.8.0_71) Type in expressions to have them evaluated. Type :help for more information. 16/03/01 09:11:53 INFO SparkContext: Running Spark version 1.6.0 16/03/01 09:11:53 INFO SecurityManager: Changing view acls to: hadoop 16/03/01 09:11:53 INFO SecurityManager: Changing modify acls to: hadoop 16/03/01 09:11:53 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriver' on port 52143. 16/03/01 09:11:54 INFO Slf4jLogger: Slf4jLogger started 16/03/01 09:11:54 INFO Remoting: Starting remoting 16/03/01 09:11:54 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@172.31.39.37:42989] 16/03/01 09:11:54 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 42989. 16/03/01 09:11:54 INFO SparkEnv: Registering MapOutputTracker 16/03/01 09:11:54 INFO SparkEnv: Registering BlockManagerMaster 16/03/01 09:11:54 INFO DiskBlockManager: Created local directory at /mnt/tmp/blockmgr-afaf0e7f-086e-49f1-946d-798e605a3fdc 16/03/01 09:11:54 INFO MemoryStore: MemoryStore started with capacity 518.1 MB 16/03/01 09:11:55 INFO SparkEnv: Registering OutputCommitCoordinator 16/03/01 09:11:55 INFO Utils: Successfully started service 'SparkUI' on port 4040. 16/03/01 09:11:55 INFO SparkUI: Started SparkUI at http://172.31.39.37:4040 16/03/01 09:11:55 INFO RMProxy: Connecting to ResourceManager at / 172.31.39.37:8032 16/03/01 09:11:55 INFO Client: Requesting a new application from cluster with 2 NodeManagers 16/03/01 09:11:55 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (11520 MB per container) 16/03/01 09:11:55 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 16/03/01 09:11:55 INFO Client: Setting up container launch context for our AM 16/03/01 09:11:55 INFO Client: Setting up the launch environment for our AM container 16/03/01 09:11:55 INFO Client: Preparing resources for our AM container 16/03/01 09:11:56 INFO Client: Uploading resource file:/usr/lib/spark/lib/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar -> hdfs:// 172.31.39.37:8020/user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar 16/03/01 09:11:56 INFO MetricsSaver: MetricsConfigRecord disabledInCluster: false instanceEngineCycleSec: 60 clusterEngineCycleSec: 60 disableClusterEngine: false maxMemoryMb: 3072 maxInstanceCount: 500 lastModified: 1456818856695 16/03/01 09:11:56 INFO MetricsSaver: Created MetricsSaver j-2FT6QNFSPTHNX:i-5f6bcadb:SparkSubmit:04807 period:60 /mnt/var/em/raw/i-5f6bcadb_20160301_SparkSubmit_04807_raw.bin 16/03/01 09:11:56 WARN DFSClient: DataStreamer Exception org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/hadoop/.sparkStaging/application_1456818849676_0005/spark-assembly-1.6.0-hadoop2.7.1-amzn-0.jar could only be replicated to 0 nodes instead of minReplication (=1). There are 0 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1550) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3110) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:632) at
spark 1.6.0 on ec2 doesn't work
Hi , I try to follow the spartk 1.6.0 to install spark on EC2. It doesn't work properly - got exceptions and at the end standalone spark cluster installed. here is log information: Any suggestions? Thanks Oleg. oleg@robinhood:~/install/spark-1.6.0-bin-hadoop2.6/ec2$ ./spark-ec2 --key-pair=CC-ES-Demo --identity-file=/home/oleg/work/entity_extraction_framework/ec2_pem_key/CC-ES-Demo.pem --region=us-east-1 --zone=us-east-1a --spot-price=0.05 -s 5 --spark-version=1.6.0launch entity-extraction-spark-cluster Setting up security groups... Searching for existing cluster entity-extraction-spark-cluster in region us-east-1... Spark AMI: ami-5bb18832 Launching instances... Requesting 5 slaves as spot instances with price $0.050 Waiting for spot instances to be granted... 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer 0 of 5 slaves granted, waiting longer All 5 slaves granted Launched master in us-east-1a, regid = r-9384033f Waiting for AWS to propagate instance metadata... Waiting for cluster to enter 'ssh-ready' state.. Warning: SSH connection error. (This could be temporary.) Host: ec2-52-90-186-83.compute-1.amazonaws.com SSH return code: 255 SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com port 22: Connection refused . Warning: SSH connection error. (This could be temporary.) Host: ec2-52-90-186-83.compute-1.amazonaws.com SSH return code: 255 SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com port 22: Connection refused . Warning: SSH connection error. (This could be temporary.) Host: ec2-52-90-186-83.compute-1.amazonaws.com SSH return code: 255 SSH output: ssh: connect to host ec2-52-90-186-83.compute-1.amazonaws.com port 22: Connection refused . Cluster is now in 'ssh-ready' state. Waited 442 seconds. Generating cluster's SSH key on master... Warning: Permanently added 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83' (ECDSA) to the list of known hosts. Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed. Warning: Permanently added 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83' (ECDSA) to the list of known hosts. Transferring cluster's SSH key to slaves... ec2-54-165-243-74.compute-1.amazonaws.com Warning: Permanently added 'ec2-54-165-243-74.compute-1.amazonaws.com,54.165.243.74' (ECDSA) to the list of known hosts. ec2-54-88-245-107.compute-1.amazonaws.com Warning: Permanently added 'ec2-54-88-245-107.compute-1.amazonaws.com,54.88.245.107' (ECDSA) to the list of known hosts. ec2-54-172-29-47.compute-1.amazonaws.com Warning: Permanently added 'ec2-54-172-29-47.compute-1.amazonaws.com,54.172.29.47' (ECDSA) to the list of known hosts. ec2-54-165-131-210.compute-1.amazonaws.com Warning: Permanently added 'ec2-54-165-131-210.compute-1.amazonaws.com,54.165.131.210' (ECDSA) to the list of known hosts. ec2-54-172-46-184.compute-1.amazonaws.com Warning: Permanently added 'ec2-54-172-46-184.compute-1.amazonaws.com,54.172.46.184' (ECDSA) to the list of known hosts. Cloning spark-ec2 scripts from https://github.com/amplab/spark-ec2/tree/branch-1.5 on master... Warning: Permanently added 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83' (ECDSA) to the list of known hosts. Cloning into 'spark-ec2'... remote: Counting objects: 2068, done. remote: Total 2068 (delta 0), reused 0 (delta 0), pack-reused 2068 Receiving objects: 100% (2068/2068), 349.76 KiB, done. Resolving deltas: 100% (796/796), done. Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed. Deploying files to master... Warning: Permanently added 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83' (ECDSA) to the list of known hosts. sending incremental file list root/spark-ec2/ec2-variables.sh sent 1,835 bytes received 40 bytes 416.67 bytes/sec total size is 1,684 speedup is 0.90 Running setup on master... Warning: Permanently added 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83' (ECDSA) to the list of known hosts. Connection to ec2-52-90-186-83.compute-1.amazonaws.com closed. Warning: Permanently added 'ec2-52-90-186-83.compute-1.amazonaws.com,52.90.186.83' (ECDSA) to the list of known hosts. Setting up Spark on ip-172-31-24-124.ec2.internal... Setting executable permissions on scripts... RSYNC'ing /root/spark-ec2 to other cluster nodes... ec2-54-165-243-74.compute-1.amazonaws.com Warning: Permanently added 'ec2-54-165-243-74.compute-1.amazonaws.com,172.31.19.61' (ECDSA) to the list of known hosts. ec2-54-88-245-107.compute-1.amazonaws.com id_rsa 100% 1679 1.6KB/s 00:00 Warning: Permanently added 'ec2-54-88-245-107.compute-1.amazonaws.com,172.31.30.81' (ECDSA) to the list of known hosts. ec2-54-172-29-47.compute-1.amazonaws.com id_rsa 100% 1679 1.6KB/s 00:00
Re: spark 1.6.0 on ec2 doesn't work
I thought script tries to install hadoop / hdfs also. And it looks like it failed. Installation is only standalone spark without hadoop. Is it correct behaviour? Also errors in the log: ERROR: Unknown Tachyon version Error: Could not find or load main class crayondata.com.log Thanks Oleg.
Re: spark 1.6.0 on ec2 doesn't work
nnection.access$2200(Client.java:205) at org.apache.hadoop.ipc.Client.getConnection(Client.java:1249) at org.apache.hadoop.ipc.Client.call(Client.java:1093) ... 84 more scala> On Tue, Jan 19, 2016 at 1:22 AM, Daniel Darabos < daniel.dara...@lynxanalytics.com> wrote: > > On Mon, Jan 18, 2016 at 5:24 PM, Oleg Ruchovets <oruchov...@gmail.com> > wrote: > >> I thought script tries to install hadoop / hdfs also. And it looks like >> it failed. Installation is only standalone spark without hadoop. Is it >> correct behaviour? >> > > Yes, it also sets up two HDFS clusters. Are they not working? Try to see > if Spark is working by running some simple jobs on it. (See > http://spark.apache.org/docs/latest/ec2-scripts.html.) > > There is no program called Hadoop. If you mean YARN, then indeed the > script does not set up YARN. It sets up standalone Spark. > > >> Also errors in the log: >>ERROR: Unknown Tachyon version >>Error: Could not find or load main class crayondata.com.log >> > > As long as Spark is working fine, you can ignore all output from the EC2 > script :). >
Re: spark 1.6.0 on ec2 doesn't work
I am running from $SPARK_HOME. It looks like connection problem to port 9000. It is on master machine. What is this process is spark tries to connect? Should I start any framework , processes before executing spark? Thanks OIeg. 16/01/19 03:17:56 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 1 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:17:57 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 2 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:17:58 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 3 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:17:59 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 4 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:18:00 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 5 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:18:01 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 6 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:18:02 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 7 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:18:03 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 8 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) 16/01/19 03:18:04 INFO ipc.Client: Retrying connect to server: ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already tried 9 time(s); retry On Tue, Jan 19, 2016 at 1:13 PM, Peter Zhang <zhangju...@gmail.com> wrote: > Could you run spark-shell at $SPARK_HOME DIR? > > You can try to change you command run at $SPARK_HOME or, point to > README.md with full path. > > > Peter Zhang > -- > Google > Sent with Airmail > > On January 19, 2016 at 11:26:14, Oleg Ruchovets (oruchov...@gmail.com) > wrote: > > It looks spark is not working fine : > > I followed this link ( > http://spark.apache.org/docs/latest/ec2-scripts.html. ) and I see spot > instances installed on EC2. > > from spark shell I am counting lines and got connection exception. > *scala> val lines = sc.textFile("README.md")* > *scala> lines.count()* > > > > *scala> val lines = sc.textFile("README.md")* > > 16/01/19 03:17:35 INFO storage.MemoryStore: Block broadcast_0 stored as > values in memory (estimated size 26.5 KB, free 26.5 KB) > 16/01/19 03:17:35 INFO storage.MemoryStore: Block broadcast_0_piece0 > stored as bytes in memory (estimated size 5.6 KB, free 32.1 KB) > 16/01/19 03:17:35 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 > in memory on 172.31.28.196:44028 (size: 5.6 KB, free: 511.5 MB) > 16/01/19 03:17:35 INFO spark.SparkContext: Created broadcast 0 from > textFile at :21 > lines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile > at :21 > > *scala> lines.count()* > > 16/01/19 03:17:55 INFO ipc.Client: Retrying connect to server: > ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already > tried 0 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) > 16/01/19 03:17:56 INFO ipc.Client: Retrying connect to server: > ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already > tried 1 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) > 16/01/19 03:17:57 INFO ipc.Client: Retrying connect to server: > ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already > tried 2 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) > 16/01/19 03:17:58 INFO ipc.Client: Retrying connect to server: > ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already > tried 3 time(s); retry policy is > RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1 SECONDS) > 16/01/19 03:17:59 INFO ipc.Client: Retrying connect to server: > ec2-54-88-242-197.compute-1.amazonaws.com/172.31.28.196:9000. Already > tried 4 time(s);
Re: PySpark Logs location
Doesn't work for me so far , using command but got such output. What should I check to fix the issue? Any configuration parameters ... [root@sdo-hdp-bd-master1 ~]# yarn logs -applicationId application_1426424283508_0048 15/05/21 13:25:09 INFO impl.TimelineClientImpl: Timeline service address: http://hdp-bd-node1.development.c4i:8188/ws/v1/timeline/ 15/05/21 13:25:09 INFO client.RMProxy: Connecting to ResourceManager at hdp-bd-node1.development.c4i/12.23.45.253:8050 /app-logs/root/logs/application_1426424283508_0048does not exist. *Log aggregation has not completed or is not enabled.* Thanks Oleg. On Wed, May 20, 2015 at 11:33 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: Oleg, You can see applicationId in your Spark History Server. Go to http://historyserver:18088/ Also check https://spark.apache.org/docs/1.1.0/running-on-yarn.html#debugging-your-application It should be no different with PySpark. -- Ruslan Dautkhanov On Wed, May 20, 2015 at 2:12 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Ruslan. Could you add more details please. Where do I get applicationId? In case I have a lot of log files would it make sense to view it from single point. How actually I can configure / manage log location of PySpark? Thanks Oleg. On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: You could use yarn logs -applicationId application_1383601692319_0008 -- Ruslan Dautkhanov On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
PySpark Logs location
Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
Re: PySpark Logs location
Hi Ruslan. Could you add more details please. Where do I get applicationId? In case I have a lot of log files would it make sense to view it from single point. How actually I can configure / manage log location of PySpark? Thanks Oleg. On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: You could use yarn logs -applicationId application_1383601692319_0008 -- Ruslan Dautkhanov On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
pass configuration parameters to PySpark job
Hi , I am looking a way to pass configuration parameters to spark job. In general I have quite simple PySpark job. def process_model(k, vc): do something sc = SparkContext(appName=TAD) lines = sc.textFile(input_job_files) result = lines.map(doSplit).groupByKey().map(lambda (k,vc): process_model(k,vc)) Question: In case I need to pass to process_model function additional metadata , parameters , etc ... I tried to do something like param = 'param1' result = lines.map(doSplit).groupByKey().map(lambda (param,k,vc): process_model(param1,k,vc)) , but job stops to work , also it looks like not elegant solution. Is there a way to have access to SparkContext from my custom functions? I found that there are methods setLocalProperty/getLocalProperty but I didn't find example how to use it for my requirements (from my function). It would be great to have short example how to pass parameters. Thanks Oleg.
multiple hdfs folder files input to PySpark
Hi We are using pyspark 1.3 and input is text files located on hdfs. file structure day1 file1.txt file2.txt day2 file1.txt file2.txt ... Question: 1) What is the way to provide as an input for PySpark job multiple files which located in Multiple folders (on hdfs). Using textFile method works fine for single file or folder , but how can I do it using multiple folders? Is there a way to pass array , list of files? 2) What is the meaning of partition parameter in textFile method? sc = SparkContext(appName=TAD) lines = sc.textFile(my input, 1) Thanks Oleg.
spark stream + cassandra (execution on event)
Hi . I want to use spark streaming to read data from cassandra. But in my case I need process data based on event. (not retrieving the data constantly from Cassandra). Question: what is the way to issue the processing using spark streaming from time to time. Thanks Oleg.
spark streaming python + kafka
Hi , I've just seen that streaming spark supports python from 1.2 version. Question, does spark streaming (python version ) supports kafka integration? Thanks Oleg.
Re: pyspark and hdfs file name
Hi Devies. Thank you for the quick answer. I have a code like this: sc = SparkContext(appName=TAD) lines = sc.textFile(sys.argv[1], 1) result = lines.map(doSplit).groupByKey().map(lambda (k,vc): traffic_process_model(k,vc)) result.saveAsTextFile(sys.argv[2]) Can you please give short example what should I do? Also I found only saveAsTextFile. Does PySpark has saveAsBinary options or what is the way to change text format output files? Thanks Oleg. On Fri, Nov 14, 2014 at 3:26 PM, Davies Liu dav...@databricks.com wrote: One option maybe call HDFS tools or client to rename them after saveAsXXXFile(). On Thu, Nov 13, 2014 at 9:39 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am running pyspark job. I need serialize final result to hdfs in binary files and having ability to give a name for output files. I found this post: http://stackoverflow.com/questions/25293962/specifying-the-output-file-name-in-apache-spark but it explains how to do it using scala. Question: How to do it using pyspark Thanks Oleg.
pyspark and hdfs file name
Hi , I am running pyspark job. I need serialize final result to *hdfs in binary files* and having ability to give a *name for output files*. I found this post: http://stackoverflow.com/questions/25293962/specifying-the-output-file-name-in-apache-spark but it explains how to do it using scala. Question: How to do it using pyspark Thanks Oleg.
Re: pyspark on yarn - lost executor
Great. Upgrade helped. Still need some inputs: 1) Is there any log files of spark job execution? 2) Where can I read about tuning / parameter configuration: For example: --num-executors 12 --driver-memory 4g --executor-memory 2g what is the meaning of thous parameters? Thanks Oleg. On Thu, Sep 18, 2014 at 12:15 AM, Davies Liu dav...@databricks.com wrote: Maybe the Python worker use too much memory during groupByKey(), groupByKey() with larger numPartitions can help. Also, can you upgrade your cluster to 1.1? It can spilling the data into disks if the memory can not hold all the data during groupByKey(). Also, If there is hot key with dozens of millions of values, the PR [1] can help it, it actually helped someone with large datasets (3T). Davies [1] https://github.com/apache/spark/pull/1977 On Wed, Sep 17, 2014 at 7:31 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Sure, I'll post to the mail list. groupByKey(self, numPartitions=None) source code Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions. So instead of using default I'll provide numPartitions , but what is the best practice to calculate the number of partitions? and how number of partitions related to my original problem? Thanks Oleg. http://spark.apache.org/docs/1.0.2/api/python/frames.html On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Look at the API for text file and groupByKey. Please don't take threads off list. Other people have the same questions. Eric Friedman On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Can hou please explain how to configure partitions? Thanks Oleg On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com wrote: Yeah, you need to increase partitions. You only have one on your text file. On groupByKey you're getting the pyspark default, which is too low. Eric Friedman On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote: This is very good question :-). Here is my code: sc = SparkContext(appName=CAD) lines = sc.textFile(sys.argv[1], 1) result = lines.map(doSplit).groupByKey().mapValues(lambda vc: my_custom_function(vc)) result.saveAsTextFile(sys.argv[2]) Should I configure partitioning manually ? Where should I configure it? Where can I read about partitioning best practices? Thanks Oleg. On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com wrote: How many partitions do you have in your input rdd? Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey? On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 QUESTION: how to debug / tune
pyspark on yarn - lost executor
Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 *QUESTION:* how to debug / tune the problem. What can cause to such behavior? I have 5 machine cluster with 32 GB ram. Dataset - 3G. command for execution: /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/inpuut.csv /output/cad_model_500_2 Where can I find description of the parameters? --num-executors 12 --driver-memory 4g --executor-memory 2g What parameters should be used for tuning? Thanks Oleg.
Re: pyspark on yarn - lost executor
Sure, I'll post to the mail list. groupByKey(self, numPartitions=None)source code http://spark.apache.org/docs/1.0.2/api/python/pyspark.rdd-pysrc.html#RDD.groupByKey Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with into numPartitions partitions. So instead of using default I'll provide numPartitions , but what is the best practice to calculate the number of partitions? and how number of partitions related to my original problem? Thanks Oleg. http://spark.apache.org/docs/1.0.2/api/python/frames.html On Wed, Sep 17, 2014 at 9:25 PM, Eric Friedman eric.d.fried...@gmail.com wrote: Look at the API for text file and groupByKey. Please don't take threads off list. Other people have the same questions. Eric Friedman On Sep 17, 2014, at 6:19 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Can hou please explain how to configure partitions? Thanks Oleg On Wednesday, September 17, 2014, Eric Friedman eric.d.fried...@gmail.com wrote: Yeah, you need to increase partitions. You only have one on your text file. On groupByKey you're getting the pyspark default, which is too low. Eric Friedman On Sep 17, 2014, at 5:29 AM, Oleg Ruchovets oruchov...@gmail.com wrote: This is very good question :-). Here is my code: sc = SparkContext(appName=CAD) lines = sc.textFile(sys.argv[1], 1) result = lines.map(doSplit).groupByKey().mapValues(lambda vc: my_custom_function(vc)) result.saveAsTextFile(sys.argv[2]) Should I configure partitioning manually ? Where should I configure it? Where can I read about partitioning best practices? Thanks Oleg. On Wed, Sep 17, 2014 at 8:22 PM, Eric Friedman eric.d.fried...@gmail.com wrote: How many partitions do you have in your input rdd? Are you specifying numPartitions in subsequent calls to groupByKey/reduceByKey? On Sep 17, 2014, at 4:38 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am execution pyspark on yarn. I have successfully executed initial dataset but now I growed it 10 times more. during execution I got all the time this error: 14/09/17 19:28:50 ERROR cluster.YarnClientClusterScheduler: Lost executor 68 on UCS-NODE1.sms1.local: remote Akka client disassociated tasks are failed a resubmitted again: 14/09/17 18:40:42 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 21, 23, 26, 29, 32, 33, 48, 75, 86, 91, 93, 94 14/09/17 18:44:18 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 31, 52, 60, 93 14/09/17 18:46:33 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 19, 20, 23, 27, 39, 51, 64 14/09/17 18:48:27 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 51, 68, 80 14/09/17 18:50:47 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 1, 20, 34, 42, 61, 67, 77, 81, 91 14/09/17 18:58:50 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 8, 21, 23, 29, 34, 40, 46, 67, 69, 86 14/09/17 19:00:44 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 6, 13, 15, 17, 18, 19, 23, 32, 38, 39, 44, 49, 53, 54, 55, 56, 57, 59, 68, 74, 81, 85, 89 14/09/17 19:06:24 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 20, 43, 59, 79, 92 14/09/17 19:16:13 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 0, 2, 3, 11, 24, 31, 43, 65, 73 14/09/17 19:27:40 INFO scheduler.DAGScheduler: Resubmitting Stage 1 (RDD at PythonRDD.scala:252) because some of its tasks had failed: 3, 7, 41, 72, 75, 84 *QUESTION:* how to debug / tune the problem. What can cause to such behavior? I have 5 machine cluster with 32 GB ram. Dataset - 3G. command for execution: /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/inpuut.csv /output/cad_model_500_2 Where can I find description of the parameters? --num-executors 12 --driver-memory 4g --executor-memory 2g What parameters should be used for tuning? Thanks Oleg.
Re: PySpark on Yarn - how group by data properly
/09/16 20:11:37 INFO storage.BlockManagerMasterActor: Trying to remove executor 2 from BlockManagerMaster. 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:2 as 3895 bytes in 0 ms 14/09/16 20:11:37 INFO storage.BlockManagerMaster: Removed 2 successfully in removeExecutor 14/09/16 20:11:37 INFO scheduler.Stage: Stage 1 is now unavailable on executor 2 (0/3, false) 14/09/16 20:11:53 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:47948/user/Executor#-1547490738] with ID 13 14/09/16 20:11:53 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE4.sms1.local:51174 with 1178.1 MB RAM 14/09/16 20:12:19 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 2) 14/09/16 20:12:19 INFO scheduler.TaskSetManager: Finished TID 5 in 41426 ms on UCS-MASTER.sms1.local (progress: 1/3) 14/09/16 20:14:23 INFO scheduler.TaskSetManager: Finished TID 3 in 165752 ms on UCS-NODE4.sms1.local (progress: 2/3) 14/09/16 20:14:23 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 1) 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0) 14/09/16 20:14:27 INFO scheduler.TaskSetManager: Finished TID 4 in 170168 ms on UCS-NODE3.sms1.local (progress: 3/3) 14/09/16 20:14:27 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Stage 1 (RDD at PythonRDD.scala:252) finished in 401.305 s 14/09/16 20:14:27 INFO scheduler.DAGScheduler: looking for newly runnable stages 14/09/16 20:14:27 INFO scheduler.DAGScheduler: running: Set() 14/09/16 20:14:27 INFO scheduler.DAGScheduler: waiting: Set(Stage 0) 14/09/16 20:14:27 INFO scheduler.DAGScheduler: failed: Set() 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List() 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2), which is now runnable 14/09/16 20:14:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2) 14/09/16 20:14:28 INFO cluster.YarnClientClusterScheduler: Adding task set 0.0 with 2 tasks 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 6 on executor 8: UCS-NODE2.sms1.local (PROCESS_LOCAL) 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 17714 bytes in 0 ms 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 7 on executor 6: UCS-NODE1.sms1.local (PROCESS_LOCAL) 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 17714 bytes in 1 ms 14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@UCS-NODE1.sms1.local:54238 14/09/16 20:14:28 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 184 bytes 14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@UCS-NODE2.sms1.local:43725 Thanks Oleg. On Wed, Sep 10, 2014 at 1:48 AM, Davies Liu dav...@databricks.com wrote: On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I came from map/reduce background and try to do quite trivial thing: I have a lot of files ( on hdfs ) - format is : 1 , 2 , 3 2 , 3 , 5 1 , 3, 5 2, 3 , 4 2 , 5, 1 I am actually need to group by key (first column) : key values 1 -- (2,3),(3,5) 2 -- (3,5),(3,4),(5,1) and I need to process (pass) values to the function f ( my custom function) outcome of function f() should be to hdfs with corresponding key: 1 -- f() outcome 2 -- f() outcome. My code is : def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) lines = sc.textFile(filename,1) counts = lines.map(doSplit).groupByKey() output = counts.collect() for (key, value) in output: print 'build model for key -' , key print value f(str(key) , value)) Questions: 1) lines.map(doSplit).groupByKey() - I didn't find the option to use groupByKey( f() ) to process grouped values? how can I process grouped keys by custom function? function f has some not trivial logic. The result of groupByKey() is still RDD with (key, ResultIterable(values)), so you can continue to call map() or mapValues() on it: lines.map(doSplit).groupByKey().map(f) But your `f` need two parameters, the map() will assume that `f` take one parameter, so you need to build a wrapper for `f`: lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs)) If the `f` only accept values as list, then you need to convert `vs` into list: result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs))) finally, you could save the `result
Re: pyspark and cassandra
Hi , I try to evaluate different option of spark + cassandra and I have couple of additional questions. My aim is to use cassandra only without hadoop: 1) Is it possible to use only cassandra as input/output parameter for PySpark? 2) In case I'll use Spark (java,scala) is it possible to use only cassandra - input/output without hadoop? 3) I know there are couple of strategies for storage level, in case my data set is quite big and I have no enough memory to process - can I use DISK_ONLY option without hadoop (having only cassandra)? Thanks Oleg On Wed, Sep 3, 2014 at 3:08 AM, Kan Zhang kzh...@apache.org wrote: In Spark 1.1, it is possible to read from Cassandra using Hadoop jobs. See examples/src/main/python/cassandra_inputformat.py for an example. You may need to write your own key/value converters. On Tue, Sep 2, 2014 at 11:10 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi All , Is it possible to have cassandra as input data for PySpark. I found example for java - http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0 and I am looking something similar for python. Thanks Oleg.
PySpark on Yarn - how group by data properly
Hi , I came from map/reduce background and try to do quite trivial thing: I have a lot of files ( on hdfs ) - format is : 1 , 2 , 3 2 , 3 , 5 1 , 3, 5 2, 3 , 4 2 , 5, 1 I am actually need to group by key (first column) : key values 1 -- (2,3),(3,5) 2 -- (3,5),(3,4),(5,1) and I need to process (pass) values to the function f ( my custom function) outcome of function f() should be to hdfs with corresponding key: 1 -- f() outcome 2 -- f() outcome. My code is : def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) lines = sc.textFile(filename,1) counts = lines.map(doSplit).groupByKey() output = counts.collect() for (key, value) in output: print 'build model for key -' , key print value f(str(key) , value)) Questions: 1) lines.map(doSplit).groupByKey() - I didn't find the option to use groupByKey( f() ) to process grouped values? how can I process grouped keys by custom function? function f has some not trivial logic. 2) Using output ( I really don't like this approach ) to pass to function looks like not scalable and executed only on one machine? What is the way using PySpark process grouped keys in distributed fashion. Multiprocessing and on different machine of the cluster. 3)In case of processing output how data can be stored on hdfs? Thanks Oleg.
PySpark on Yarn a lot of python scripts project
Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg.
Re: PySpark on Yarn a lot of python scripts project
Ok , I didn't explain my self correct: In case of java having a lot of classes jar should be used. All examples for PySpark I found is one py script( Pi , wordcount ...) , but in real environment analytics has more then one py file. My question is how to use PySpark on Yarn analytics in case multiple python files. I a not so sure that using coma separated python files is a good option in my case ( we have quite a lot of files). In case of using zip option: Is it just a zip all python files like in jar in java? In java there is a Manifest file which points to the main method? Is the zip option best practice or there are other techniques? Thanks Oleg. On Sat, Sep 6, 2014 at 1:01 AM, Dimension Data, LLC. subscripti...@didata.us wrote: Hi: Curious... is there any reason not to use one of the below pyspark options (in red)? Assuming each file is, say 10k in size, is 50 files too much? Does that touch on some practical limitation? Usage: ./bin/pyspark [options] Options: --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. --deploy-mode DEPLOY_MODE Where to run the driver program: either client to run on the local machine, or cluster to run inside cluster. --class CLASS_NAME Your application's main class (for Java / Scala apps). --name NAME A name of your application. --jars JARS Comma-separated list of local jars to include on the driver and executor classpaths. --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. --files FILES Comma-separated list of files to be placed in the working directory of each executor. [ ... snip ... ] On 09/05/2014 12:00 PM, Davies Liu wrote: Hi Oleg, In order to simplify the process of package and distribute you codes, you could deploy an shared storage (such as NFS), and put your project in it, mount it to all the slaves as /projects. In the spark job scripts, you can access your project by put the path into sys.path, such as: import sys sys.path.append(/projects) import myproject Davies On Fri, Sep 5, 2014 at 1:28 AM, Oleg Ruchovets oruchov...@gmail.com oruchov...@gmail.com wrote: Hi , We avaluating PySpark and successfully executed examples of PySpark on Yarn. Next step what we want to do: We have a python project ( bunch of python script using Anaconda packages). Question: What is the way to execute PySpark on Yarn having a lot of python files ( ~ 50)? Should it be packaged in archive? How the command to execute Pyspark on Yarn with a lot of files will looks like? Currently command looks like: ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/wordcount.py 1000 Thanks Oleg. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
2 python installations cause PySpark on Yarn problem
Hi , I am evaluating the PySpark. I have hdp hortonworks installed with python 2.6.6. (I can't remove it since it is used by hortonworks). I can successfully execute PySpark on Yarn. We need to use Anaconda packages , so I install anaconda. Anaconda is installed with python 2.7.7 and it is added to classpath. After installing the anaconda Pi example stops to work - I used it for testing PySpark on Yarn. Question: How PySpark the can be used with having 2 Python versions on one machine. In classpath I have 2.7.7 on every machine. How can I check what version is used in runtime executing PySpark 2.7.7? Exception I get are the same as in previous emails: [root@HDOP-B spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563]# ./bin/spark-submit --master yarn --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/pi.py 1000 /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0- 563.jar:/etc/hadoop/conf -XX:MaxPermSize=128m -Djava.library.path= -Xms4g -Xmx4g 14/09/04 12:53:11 INFO spark.SecurityManager: Changing view acls to: root 14/09/04 12:53:11 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/04 12:53:12 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/04 12:53:12 INFO Remoting: Starting remoting 14/09/04 12:53:12 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:45747] 14/09/04 12:53:12 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:45747] 14/09/04 12:53:12 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/04 12:53:12 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/04 12:53:12 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140904125312-c7ea 14/09/04 12:53:12 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB. 14/09/04 12:53:12 INFO network.ConnectionManager: Bound socket to port 37363 with id = ConnectionManagerId(HDOP-B.AGT,37363) 14/09/04 12:53:12 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/04 12:53:12 INFO storage.BlockManagerInfo: Registering block manager HDOP-B.AGT:37363 with 2.3 GB RAM 14/09/04 12:53:12 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/04 12:53:12 INFO spark.HttpServer: Starting HTTP Server 14/09/04 12:53:12 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 12:53:12 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:33547 14/09/04 12:53:12 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.193.1.76:33547 14/09/04 12:53:12 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-054f4eda-b93b-47d3-87d5-c40e81fc1fe8 14/09/04 12:53:12 INFO spark.HttpServer: Starting HTTP Server 14/09/04 12:53:12 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 12:53:12 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:54594 14/09/04 12:53:13 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/04 12:53:13 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/04 12:53:13 INFO ui.SparkUI: Started SparkUI at http://HDOP-B.AGT:4040 http://hdop-b.agt:4040/ 14/09/04 12:53:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable --args is deprecated. Use --arg instead. 14/09/04 12:53:14 INFO client.RMProxy: Connecting to ResourceManager at HDOP-N1.AGT/10.193.1.72:8050 14/09/04 12:53:14 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 6 14/09/04 12:53:14 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/09/04 12:53:14 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 13824 14/09/04 12:53:14 INFO yarn.Client: Preparing Local resources 14/09/04 12:53:15 INFO yarn.Client: Uploading file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar to hdfs://HDOP-B.AGT:8020/user/root/.sparkStaging/application_1409805761292_0005/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar 14/09/04 12:53:17 INFO yarn.Client: Uploading file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py to hdfs://HDOP-B.AGT:8020/user/root/.sparkStaging/application_1409805761292_0005/pi.py 14/09/04 12:53:17 INFO yarn.Client: Setting up the launch environment 14/09/04 12:53:17 INFO yarn.Client: Setting up container launch context 14/09/04 12:53:17 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.tachyonStore.folderName=\spark-2b59c845-3de2-4c3d-a352-1379ecade281\,
pyspark on yarn hdp hortonworks
Hi all. I am trying to run pyspark on yarn already couple of days: http://hortonworks.com/kb/spark-1-0-1-technical-preview-hdp-2-1-3/ I posted exception on previous posts. It looks that I didn't do correct configuration. I googled quite a lot and I can't find the steps should be done to configure PySpark running on Yarn. Can you please share the steps (critical points) should be configured to use PaSpark on Yarn ( hortonworks distribution) : Environment variables. Classpath copy jars to all machine other configuration. Thanks Oleg.
pyspark and cassandra
Hi All , Is it possible to have cassandra as input data for PySpark. I found example for java - http://java.dzone.com/articles/sparkcassandra-stack-perform?page=0,0 and I am looking something similar for python. Thanks Oleg.
Re: pyspark yarn got exception
z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:279) at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:744) What should I do to fix the issue Thanks Oleg. On Tue, Sep 2, 2014 at 10:32 PM, Andrew Or and...@databricks.com wrote: Hi Oleg, If you are running Spark on a yarn cluster, you should set --master to yarn. By default this runs in client mode, which redirects all output of your application to your console. This is failing because it is trying to connect to a standalone master that you probably did not start. I am somewhat puzzled as to how you ran into an OOM from this configuration, however. Does this problem still occur if you set the correct master? -Andrew 2014-09-02 2:42 GMT-07:00 Oleg Ruchovets oruchov...@gmail.com: Hi , I've installed pyspark on hpd hortonworks cluster. Executing pi example: command: spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563]# ./bin/spark-submit --master spark://10.193.1.71:7077 examples/src/main/python/pi.py 1000 exception: 14/09/02 17:34:02 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/02 17:34:02 INFO SecurityManager: Changing view acls to: root 14/09/02 17:34:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/02 17:34:02 INFO Slf4jLogger: Slf4jLogger started 14/09/02 17:34:02 INFO Remoting: Starting remoting 14/09/02 17:34:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-m.agt:41059] 14/09/02 17:34:03 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-m.agt:41059] 14/09/02 17:34:03 INFO SparkEnv: Registering MapOutputTracker 14/09/02 17:34:03 INFO SparkEnv: Registering BlockManagerMaster 14/09/02 17:34:03 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140902173403-cda8 14/09/02 17:34:03 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/02 17:34:03 INFO ConnectionManager: Bound socket to port 34931 with id = ConnectionManagerId(HDOP-M.AGT,34931) 14/09/02 17:34:03 INFO BlockManagerMaster: Trying to register BlockManager 14/09/02 17:34:03 INFO BlockManagerInfo: Registering block manager HDOP-M.AGT:34931 with 294.9 MB RAM 14/09/02 17:34:03 INFO BlockManagerMaster: Registered BlockManager 14/09/02 17:34:03 INFO HttpServer: Starting HTTP Server 14/09/02 17:34:03 INFO HttpBroadcast: Broadcast server started at http://10.193.1.71:54341 14/09/02 17:34:03 INFO HttpFileServer: HTTP File server directory is /tmp/spark-77c7a7dc-181e-4069-a014-8103a6a6330a 14/09/02 17:34:03 INFO HttpServer: Starting HTTP Server 14/09/02 17:34:04 INFO SparkUI: Started SparkUI at http://HDOP-M.AGT:4040 14/09/02 17:34:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/02 17:34:04 INFO Utils: Copying /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py to /tmp/spark-f2e0cc0f-59cb-4f6c-9d48-f16205a40c7e/pi.py 14/09/02 17:34:04 INFO SparkContext: Added file file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py at http://10.193.1.71:52938/files/pi.py with timestamp 1409650444941 14/09/02 17:34:05 INFO AppClient$ClientActor: Connecting to master spark://10.193.1.71:7077... 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@10.193.1.71:7077] 14/09/02 17:34:05 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.193.1.71:7077
Re: pyspark yarn got exception
Hi , I change my command to : ./bin/spark-submit --master spark://HDOP-B.AGT:7077 --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/pi.py 1000 and it fixed the problem. I still have couple of questions: PROCESS_LOCAL is not Yarn execution , right? how should I configure the running on yarn? Should I exeture start-all script on all machine or only one? Where is the UI / LOGS of spark execution? 152152SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:140.2 s00SUCCESS PROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.9 s39 ms22SUCCESSPROCESS_LOCAL HDOP-B.AGT2014/09/03 12:35:090.9 s39 ms33SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.9 s39 ms1 ms44SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.8 s39 ms2 ms55SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.8 s39 ms1 ms6 6SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.8 s1 ms77SUCCESS PROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:090.9 s88SUCCESSPROCESS_LOCAL HDOP-B.AGT2014/09/03 12:35:100.3 s99SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:100.4 s1010SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:100.3 s1 ms SUCCESSPROCESS_LOCALHDOP-B.AGT2014/09/03 12:35:100.3 s On Wed, Sep 3, 2014 at 12:19 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Andrew. what should I do to set master on yarn, can you please pointing me on command or documentation how to do it? I am doing the following: executed start-all.sh [root@HDOP-B sbin]# ./start-all.sh starting org.apache.spark.deploy.master.Master, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-HDOP-B.AGT.out localhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts. localhost: starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-HDOP-B.AGT.out after execute the command: ./bin/spark-submit --master spark://HDOP-B.AGT:7077 examples/src/main/python/pi.py 1000 the result is the following: /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m 14/09/03 12:10:06 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/03 12:10:06 INFO SecurityManager: Changing view acls to: root 14/09/03 12:10:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/03 12:10:07 INFO Slf4jLogger: Slf4jLogger started 14/09/03 12:10:07 INFO Remoting: Starting remoting 14/09/03 12:10:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO SparkEnv: Registering MapOutputTracker 14/09/03 12:10:07 INFO SparkEnv: Registering BlockManagerMaster 14/09/03 12:10:08 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140903121008-cf09 14/09/03 12:10:08 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/03 12:10:08 INFO ConnectionManager: Bound socket to port 45041 with id = ConnectionManagerId(HDOP-B.AGT,45041) 14/09/03 12:10:08 INFO BlockManagerMaster: Trying to register BlockManager 14/09/03 12:10:08 INFO BlockManagerInfo: Registering block manager HDOP-B.AGT:45041 with 294.9 MB RAM 14/09/03 12:10:08 INFO BlockManagerMaster: Registered BlockManager 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO HttpBroadcast: Broadcast server started at http://10.193.1.76:59336 14/09/03 12:10:08 INFO HttpFileServer: HTTP File server directory is /tmp/spark-7bf5c3c3-1c02-41e8-9fb0-983e175dd45c 14/09/03 12:10:08 INFO HttpServer: Starting HTTP Server 14/09/03 12:10:08 INFO SparkUI: Started SparkUI at http://HDOP-B.AGT:4040 14/09/03 12:10:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 14/09/03 12:10:09 INFO Utils: Copying /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py to /tmp/spark-4e252376-70cb-4171-bf2c-d804524e816c/pi.py 14/09/03 12:10:09 INFO SparkContext: Added file file:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/examples/src/main/python/pi.py at http://10.193.1.76:45893/files/pi.py with timestamp 1409717409277 14/09/03 12:10:09 INFO AppClient$ClientActor: Connecting to master spark://HDOP-B.AGT:7077... 14/09/03 12:10:09 INFO SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20140903121009- 14/09/03 12:10:09 INFO AppClient$ClientActor: Executor added: app-20140903121009-/0
Re: pyspark yarn got exception
) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/09/03 13:34:16 WARN scheduler.TaskSetManager: Task 11 was killed. 14/09/03 13:34:17 WARN scheduler.TaskSetManager: Loss was due to org.apache.spark.TaskKilledException org.apache.spark.TaskKilledException at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:174) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) What should I do to resolve the issue? Thanks Oleg. On Wed, Sep 3, 2014 at 12:51 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I change my command to : ./bin/spark-submit --master spark://HDOP-B.AGT:7077 --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 examples/src/main/python/pi.py 1000 and it fixed the problem. I still have couple of questions: PROCESS_LOCAL is not Yarn execution , right? how should I configure the running on yarn? Should I exeture start-all script on all machine or only one? Where is the UI / LOGS of spark execution? 152 152 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:14 0.2 s 0 0 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 39 ms 2 2 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 39 ms 3 3 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 39 ms1 ms 4 4 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.8 s 39 ms 2 ms 5 5 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.8 s 39 ms1 ms 6 6 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.8 s 1 ms 7 7 SUCCESSPROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:09 0.9 s 8 8 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.3 s 9 9 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.4 s 10 10 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.3 s 1 ms 11 11 SUCCESS PROCESS_LOCAL HDOP-B.AGT 2014/09/03 12:35:10 0.3 s On Wed, Sep 3, 2014 at 12:19 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Andrew. what should I do to set master on yarn, can you please pointing me on command or documentation how to do it? I am doing the following: executed start-all.sh [root@HDOP-B sbin]# ./start-all.sh starting org.apache.spark.deploy.master.Master, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.master.Master-1-HDOP-B.AGT.out localhost: Warning: Permanently added 'localhost' (RSA) to the list of known hosts. localhost: starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/sbin/../logs/spark-root-org.apache.spark.deploy.worker.Worker-1-HDOP-B.AGT.out after execute the command: ./bin/spark-submit --master spark://HDOP-B.AGT:7077 examples/src/main/python/pi.py 1000 the result is the following: /usr/jdk64/jdk1.7.0_45/bin/java ::/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/conf:/root/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar -XX:MaxPermSize=128m -Djava.library.path= -Xms512m -Xmx512m 14/09/03 12:10:06 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 14/09/03 12:10:06 INFO SecurityManager: Changing view acls to: root 14/09/03 12:10:06 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root) 14/09/03 12:10:07 INFO Slf4jLogger: Slf4jLogger started 14/09/03 12:10:07 INFO Remoting: Starting remoting 14/09/03 12:10:07 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sp...@hdop-b.agt:38944] 14/09/03 12:10:07 INFO SparkEnv: Registering MapOutputTracker 14/09/03 12:10:07 INFO SparkEnv: Registering BlockManagerMaster 14/09/03 12:10:08 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20140903121008-cf09 14/09/03 12:10:08 INFO MemoryStore: MemoryStore started with capacity 294.9 MB. 14/09/03 12:10:08 INFO ConnectionManager: Bound socket
u'' notation with pyspark output data
Hi , I am working with pyspark and doing simple aggregation def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) counts = lines.map(doSplit).groupByKey() output = counts.collect() Iterating over output I got such format of the data u'1385501280' , u'14.0' , but actually I need to work with 14 instead of u'14.0' and 1385501280 u'1385501280' Question: how to get actually data without u'' notation? Thanks Oleg.
spark on disk executions
Hi , We have ~ 1TB of data to process , but our cluster doesn't have sufficient memory for such data set. ( we have 5-10 machine cluster). Is it possible to process 1TB data using ON DISK options using spark? If yes where can I read about the configuration for ON DISK executions. Thanks Oleg.
Re: anaconda and spark integration
Hello. Is there an integration spark ( pyspark) with anaconda? I googled a lot and didn't find relevant information. Could you please pointing me on tutorial or simple example. Thanks in advance Oleg.