Re: PySpark on Yarn - how group by data properly
I am expand my data set and executing pyspark on yarn: I payed attention that only 2 processes processed the data: 14210 yarn 20 0 2463m 2.0g 9708 R 100.0 4.3 8:22.63 python2.7 32467 yarn 20 0 2519m 2.1g 9720 R 99.3 4.4 7:16.97 python2.7 *Question:* *how to configure pyspark to have more processes for process the data?* Here is my command : [hdfs@UCS-MASTER cad]$ /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/data.csv /output/cad_model_500_1 I tried to play with num-executors and executor-cores but it is still 2 python processes doing the job. I have 5 machine cluster with 32 GB ram. console output: 14/09/16 20:07:34 INFO spark.SecurityManager: Changing view acls to: hdfs 14/09/16 20:07:34 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hdfs) 14/09/16 20:07:34 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/16 20:07:35 INFO Remoting: Starting remoting 14/09/16 20:07:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@UCS-MASTER.sms1.local:39379] 14/09/16 20:07:35 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@UCS-MASTER.sms1.local:39379] 14/09/16 20:07:35 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/16 20:07:35 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/16 20:07:35 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140916200735-53f6 14/09/16 20:07:35 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB. 14/09/16 20:07:35 INFO network.ConnectionManager: Bound socket to port 37255 with id = ConnectionManagerId(UCS-MASTER.sms1.local,37255) 14/09/16 20:07:35 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/16 20:07:35 INFO storage.BlockManagerInfo: Registering block manager UCS-MASTER.sms1.local:37255 with 2.3 GB RAM 14/09/16 20:07:35 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server 14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/16 20:07:35 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55286 14/09/16 20:07:35 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.193.218.2:55286 14/09/16 20:07:35 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ca8193be-9148-4e7e-a0cc-4b6e7cb72172 14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server 14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/16 20:07:35 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:38065 14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/16 20:07:35 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/16 20:07:35 INFO ui.SparkUI: Started SparkUI at http://UCS-MASTER.sms1.local:4040 14/09/16 20:07:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable --args is deprecated. Use --arg instead. 14/09/16 20:07:36 INFO impl.TimelineClientImpl: Timeline service address: http://UCS-NODE1.sms1.local:8188/ws/v1/timeline/ 14/09/16 20:07:37 INFO client.RMProxy: Connecting to ResourceManager at UCS-NODE1.sms1.local/10.193.218.3:8050 14/09/16 20:07:37 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 5 14/09/16 20:07:37 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/09/16 20:07:37 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 53248 14/09/16 20:07:37 INFO yarn.Client: Preparing Local resources 14/09/16 20:07:37 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/09/16 20:07:37 INFO yarn.Client: Uploading file:/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar to hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar 14/09/16 20:07:39 INFO yarn.Client: Uploading file:/usr/lib/cad/PrepareDataSetYarn.py to hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/PrepareDataSetYarn.py 14/09/16 20:07:39 INFO yarn.Client: Uploading file:/usr/lib/cad/tad.zip to hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/tad.zip 14/09/16 20:07:39 INFO yarn.Client: Setting up the launch environment 14/09/16 20:07:39 INFO yarn.Client: Setting up container launch context 14/09/16 20:07:39 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/j
Re: PySpark on Yarn - how group by data properly
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets wrote: > Hi , > >I came from map/reduce background and try to do quite trivial thing: > > I have a lot of files ( on hdfs ) - format is : > >1 , 2 , 3 >2 , 3 , 5 >1 , 3, 5 > 2, 3 , 4 > 2 , 5, 1 > > I am actually need to group by key (first column) : > key values > 1 --> (2,3),(3,5) > 2 --> (3,5),(3,4),(5,1) > > and I need to process (pass) values to the function f ( my custom > function) > outcome of function f() should be to hdfs with corresponding key: > 1 --> f() outcome > 2 --> f() outcome. > > My code is : > > def doSplit(x): > y = x.split(',') > if(len(y)==3): >return y[0],(y[1],y[2]) > > > lines = sc.textFile(filename,1) > counts = lines.map(doSplit).groupByKey() > output = counts.collect() > > for (key, value) in output: > print 'build model for key ->' , key > print value > f(str(key) , value)) > > > Questions: >1) lines.map(doSplit).groupByKey() - I didn't find the option to use > groupByKey( f() ) to process grouped values? how can I process grouped keys > by custom function? function f has some not trivial logic. The result of groupByKey() is still RDD with (key, ResultIterable(values)), so you can continue to call map() or mapValues() on it: lines.map(doSplit).groupByKey().map(f) But your `f` need two parameters, the map() will assume that `f` take one parameter, so you need to build a wrapper for `f`: lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs)) If the `f` only accept values as list, then you need to convert `vs` into list: result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs))) finally, you could save the `result` into HDFS: result.saveAsPickleFile(path, batch=1024) > 2) Using output ( I really don't like this approach ) to pass to > function looks like not scalable and executed only on one machine? What is > the way using PySpark process grouped keys in distributed fashion. > Multiprocessing and on different machine of the cluster. > > 3)In case of processing output how data can be stored on hdfs? Currently, it's not easy to access files in HDFS, you could do it by sc.parallelize(local_data).map(str).saveAsTextFile() > Thanks > Oleg. > > > > > > > > > > > > > > > > > > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
PySpark on Yarn - how group by data properly
Hi , I came from map/reduce background and try to do quite trivial thing: I have a lot of files ( on hdfs ) - format is : 1 , 2 , 3 2 , 3 , 5 1 , 3, 5 2, 3 , 4 2 , 5, 1 I am actually need to group by key (first column) : key values 1 --> (2,3),(3,5) 2 --> (3,5),(3,4),(5,1) and I need to process (pass) values to the function f ( my custom function) outcome of function f() should be to hdfs with corresponding key: 1 --> f() outcome 2 --> f() outcome. My code is : def doSplit(x): y = x.split(',') if(len(y)==3): return y[0],(y[1],y[2]) lines = sc.textFile(filename,1) counts = lines.map(doSplit).groupByKey() output = counts.collect() for (key, value) in output: print 'build model for key ->' , key print value f(str(key) , value)) Questions: 1) lines.map(doSplit).groupByKey() - I didn't find the option to use groupByKey( f() ) to process grouped values? how can I process grouped keys by custom function? function f has some not trivial logic. 2) Using output ( I really don't like this approach ) to pass to function looks like not scalable and executed only on one machine? What is the way using PySpark process grouped keys in distributed fashion. Multiprocessing and on different machine of the cluster. 3)In case of processing output how data can be stored on hdfs? Thanks Oleg.