I am expand my data set and executing pyspark on yarn: I payed attention that only 2 processes processed the data:
14210 yarn 20 0 2463m 2.0g 9708 R 100.0 4.3 8:22.63 python2.7 32467 yarn 20 0 2519m 2.1g 9720 R 99.3 4.4 7:16.97 python2.7 *Question:* *how to configure pyspark to have more processes for process the data?* Here is my command : [hdfs@UCS-MASTER cad]$ /usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit --master yarn --num-executors 12 --driver-memory 4g --executor-memory 2g --py-files tad.zip --executor-cores 4 /usr/lib/cad/PrepareDataSetYarn.py /input/tad/data.csv /output/cad_model_500_1 I tried to play with num-executors and executor-cores but it is still 2 python processes doing the job. I have 5 machine cluster with 32 GB ram. console output: 14/09/16 20:07:34 INFO spark.SecurityManager: Changing view acls to: hdfs 14/09/16 20:07:34 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hdfs) 14/09/16 20:07:34 INFO slf4j.Slf4jLogger: Slf4jLogger started 14/09/16 20:07:35 INFO Remoting: Starting remoting 14/09/16 20:07:35 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@UCS-MASTER.sms1.local:39379] 14/09/16 20:07:35 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@UCS-MASTER.sms1.local:39379] 14/09/16 20:07:35 INFO spark.SparkEnv: Registering MapOutputTracker 14/09/16 20:07:35 INFO spark.SparkEnv: Registering BlockManagerMaster 14/09/16 20:07:35 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20140916200735-53f6 14/09/16 20:07:35 INFO storage.MemoryStore: MemoryStore started with capacity 2.3 GB. 14/09/16 20:07:35 INFO network.ConnectionManager: Bound socket to port 37255 with id = ConnectionManagerId(UCS-MASTER.sms1.local,37255) 14/09/16 20:07:35 INFO storage.BlockManagerMaster: Trying to register BlockManager 14/09/16 20:07:35 INFO storage.BlockManagerInfo: Registering block manager UCS-MASTER.sms1.local:37255 with 2.3 GB RAM 14/09/16 20:07:35 INFO storage.BlockManagerMaster: Registered BlockManager 14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server 14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/16 20:07:35 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:55286 14/09/16 20:07:35 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.193.218.2:55286 14/09/16 20:07:35 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-ca8193be-9148-4e7e-a0cc-4b6e7cb72172 14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server 14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/16 20:07:35 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:38065 14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT 14/09/16 20:07:35 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 14/09/16 20:07:35 INFO ui.SparkUI: Started SparkUI at http://UCS-MASTER.sms1.local:4040 14/09/16 20:07:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable --args is deprecated. Use --arg instead. 14/09/16 20:07:36 INFO impl.TimelineClientImpl: Timeline service address: http://UCS-NODE1.sms1.local:8188/ws/v1/timeline/ 14/09/16 20:07:37 INFO client.RMProxy: Connecting to ResourceManager at UCS-NODE1.sms1.local/10.193.218.3:8050 14/09/16 20:07:37 INFO yarn.Client: Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: 5 14/09/16 20:07:37 INFO yarn.Client: Queue info ... queueName: default, queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0, queueApplicationCount = 0, queueChildQueueCount = 0 14/09/16 20:07:37 INFO yarn.Client: Max mem capabililty of a single resource in this cluster 53248 14/09/16 20:07:37 INFO yarn.Client: Preparing Local resources 14/09/16 20:07:37 WARN hdfs.BlockReaderLocal: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded. 14/09/16 20:07:37 INFO yarn.Client: Uploading file:/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar to hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar 14/09/16 20:07:39 INFO yarn.Client: Uploading file:/usr/lib/cad/PrepareDataSetYarn.py to hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/PrepareDataSetYarn.py 14/09/16 20:07:39 INFO yarn.Client: Uploading file:/usr/lib/cad/tad.zip to hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/tad.zip 14/09/16 20:07:39 INFO yarn.Client: Setting up the launch environment 14/09/16 20:07:39 INFO yarn.Client: Setting up container launch context 14/09/16 20:07:39 INFO yarn.Client: Command for starting the Spark ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m, -Djava.io.tmpdir=$PWD/tmp, -Dspark.tachyonStore.folderName=\"spark-7543138b-c763-4a6b-abff-81b8dd1bd639\", -Dspark.executor.memory=\"2g\", -Dspark.executor.instances=\"12\", -Dspark.yarn.dist.files=\"file:/usr/lib/cad/PrepareDataSetYarn.py,file:/usr/lib/cad/tad.zip\", -Dspark.yarn.secondary.jars=\"\", -Dspark.submit.pyFiles=\"/usr/lib/cad/tad.zip\", -Dspark.driver.host=\"UCS-MASTER.sms1.local\", -Dspark.app.name=\"CAD\", -Dspark.fileserver.uri=\"http://10.193.218.2:38065\", -Dspark.master=\"yarn-client\", -Dspark.driver.port=\"39379\", -Dspark.executor.cores=\"4\", -Dspark.httpBroadcast.uri=\" http://10.193.218.2:55286\", -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.deploy.yarn.ExecutorLauncher, --class, notused, --jar , null, --args 'UCS-MASTER.sms1.local:39379' , --executor-memory, 2048, --executor-cores, 4, --num-executors , 12, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr) 14/09/16 20:07:39 INFO yarn.Client: Submitting application to ASM 14/09/16 20:07:39 INFO impl.YarnClientImpl: Submitted application application_1409564765875_0046 14/09/16 20:07:39 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1410869259760 yarnAppState: ACCEPTED 14/09/16 20:07:40 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1410869259760 yarnAppState: ACCEPTED 14/09/16 20:07:41 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1410869259760 yarnAppState: ACCEPTED 14/09/16 20:07:42 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: -1 appStartTime: 1410869259760 yarnAppState: ACCEPTED 14/09/16 20:07:43 INFO cluster.YarnClientSchedulerBackend: Application report from ASM: appMasterRpcPort: 0 appStartTime: 1410869259760 yarnAppState: RUNNING 14/09/16 20:07:45 INFO cluster.YarnClientClusterScheduler: YarnClientClusterScheduler.postStartHook done 14/09/16 20:07:45 INFO storage.MemoryStore: ensureFreeSpace(85685) called with curMem=0, maxMem=2470025625 14/09/16 20:07:45 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 83.7 KB, free 2.3 GB) 14/09/16 20:07:46 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 14/09/16 20:07:46 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 14/09/16 20:07:46 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:38745/user/Executor#-700940149] with ID 2 14/09/16 20:07:46 INFO spark.SparkContext: Starting job: saveAsTextFile at NativeMethodAccessorImpl.java:-2 14/09/16 20:07:46 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE1.sms1.local:39103 with 1178.1 MB RAM 14/09/16 20:07:46 INFO mapred.FileInputFormat: Total input paths to process : 1 14/09/16 20:07:46 INFO scheduler.DAGScheduler: Registering RDD 3 (RDD at PythonRDD.scala:252) 14/09/16 20:07:46 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at NativeMethodAccessorImpl.java:-2) with 2 output partitions (allowLocal=false) 14/09/16 20:07:46 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at NativeMethodAccessorImpl.java:-2) 14/09/16 20:07:46 INFO scheduler.DAGScheduler: Parents of final stage: List(Stage 1) 14/09/16 20:07:46 INFO scheduler.DAGScheduler: Missing parents: List(Stage 1) 14/09/16 20:07:46 INFO scheduler.DAGScheduler: Submitting Stage 1 (PairwiseRDD[3] at RDD at PythonRDD.scala:252), which has no missing parents 14/09/16 20:07:46 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from Stage 1 (PairwiseRDD[3] at RDD at PythonRDD.scala:252) 14/09/16 20:07:46 INFO cluster.YarnClientClusterScheduler: Adding task set 1.0 with 3 tasks 14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 0 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL) 14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 3895 bytes in 2 ms 14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 1 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL) 14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 3895 bytes in 0 ms 14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:2 as TID 2 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL) 14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:2 as 3895 bytes in 0 ms 14/09/16 20:07:46 INFO util.RackResolver: Resolved UCS-NODE1.sms1.local to /default-rack 14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE2.sms1.local:40219/user/Executor#36134446] with ID 3 14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE2.sms1.local to /default-rack 14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE2.sms1.local:37085/user/Executor#-199925854] with ID 8 14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:50805/user/Executor#-1155334234] with ID 6 14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE2.sms1.local:56453 with 1178.1 MB RAM 14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE3.sms1.local:41585/user/Executor#2145905321] with ID 5 14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE3.sms1.local to /default-rack 14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE2.sms1.local:37911 with 1178.1 MB RAM 14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE1.sms1.local:50858 with 1178.1 MB RAM 14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE3.sms1.local:59208/user/Executor#-1289679344] with ID 10 14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE3.sms1.local:40785 with 1178.1 MB RAM 14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:59378/user/Executor#-1053652401] with ID 4 14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE4.sms1.local to /default-rack 14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:57503/user/Executor#-1979237887] with ID 9 14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE3.sms1.local:37476 with 1178.1 MB RAM 14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE4.sms1.local:52439 with 1178.1 MB RAM 14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE4.sms1.local:36292 with 1178.1 MB RAM 14/09/16 20:07:48 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:41792/user/Executor#-69338597] with ID 12 14/09/16 20:07:48 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE1.sms1.local:59539 with 1178.1 MB RAM 14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:60623/user/Executor#361407816] with ID 11 14/09/16 20:07:49 INFO util.RackResolver: Resolved UCS-MASTER.sms1.local to /default-rack 14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:44955/user/Executor#389344699] with ID 1 14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:54092/user/Executor#-1381396123] with ID 7 14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager UCS-MASTER.sms1.local:52854 with 1178.1 MB RAM 14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager UCS-MASTER.sms1.local:50766 with 1178.1 MB RAM 14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager UCS-MASTER.sms1.local:55993 with 1178.1 MB RAM 14/09/16 20:08:58 INFO scheduler.TaskSetManager: Finished TID 2 in 71856 ms on UCS-NODE1.sms1.local (progress: 1/3) 14/09/16 20:08:58 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 2) 14/09/16 20:11:37 INFO cluster.YarnClientSchedulerBackend: Executor 2 disconnected, so removing it 14/09/16 20:11:37 ERROR cluster.YarnClientClusterScheduler: Lost executor 2 on UCS-NODE1.sms1.local: remote Akka client disassociated 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Re-queueing tasks for 2 from TaskSet 1.0 14/09/16 20:11:37 INFO scheduler.DAGScheduler: Resubmitted ShuffleMapTask(1, 2), so marking it as still running 14/09/16 20:11:37 WARN scheduler.TaskSetManager: Lost TID 1 (task 1.0:1) 14/09/16 20:11:37 WARN scheduler.TaskSetManager: Lost TID 0 (task 1.0:0) 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 9: UCS-NODE4.sms1.local (NODE_LOCAL) 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 3895 bytes in 0 ms 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 4 on executor 5: UCS-NODE3.sms1.local (NODE_LOCAL) 14/09/16 20:11:37 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 0) 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 3895 bytes in 0 ms 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:2 as TID 5 on executor 1: UCS-MASTER.sms1.local (NODE_LOCAL) 14/09/16 20:11:37 INFO storage.BlockManagerMasterActor: Trying to remove executor 2 from BlockManagerMaster. 14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:2 as 3895 bytes in 0 ms 14/09/16 20:11:37 INFO storage.BlockManagerMaster: Removed 2 successfully in removeExecutor 14/09/16 20:11:37 INFO scheduler.Stage: Stage 1 is now unavailable on executor 2 (0/3, false) 14/09/16 20:11:53 INFO cluster.YarnClientSchedulerBackend: Registered executor: Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:47948/user/Executor#-1547490738] with ID 13 14/09/16 20:11:53 INFO storage.BlockManagerInfo: Registering block manager UCS-NODE4.sms1.local:51174 with 1178.1 MB RAM 14/09/16 20:12:19 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 2) 14/09/16 20:12:19 INFO scheduler.TaskSetManager: Finished TID 5 in 41426 ms on UCS-MASTER.sms1.local (progress: 1/3) 14/09/16 20:14:23 INFO scheduler.TaskSetManager: Finished TID 3 in 165752 ms on UCS-NODE4.sms1.local (progress: 2/3) 14/09/16 20:14:23 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 1) 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1, 0) 14/09/16 20:14:27 INFO scheduler.TaskSetManager: Finished TID 4 in 170168 ms on UCS-NODE3.sms1.local (progress: 3/3) 14/09/16 20:14:27 INFO cluster.YarnClientClusterScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Stage 1 (RDD at PythonRDD.scala:252) finished in 401.305 s 14/09/16 20:14:27 INFO scheduler.DAGScheduler: looking for newly runnable stages 14/09/16 20:14:27 INFO scheduler.DAGScheduler: running: Set() 14/09/16 20:14:27 INFO scheduler.DAGScheduler: waiting: Set(Stage 0) 14/09/16 20:14:27 INFO scheduler.DAGScheduler: failed: Set() 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Missing parents for Stage 0: List() 14/09/16 20:14:27 INFO scheduler.DAGScheduler: Submitting Stage 0 (MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2), which is now runnable 14/09/16 20:14:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2) 14/09/16 20:14:28 INFO cluster.YarnClientClusterScheduler: Adding task set 0.0 with 2 tasks 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 6 on executor 8: UCS-NODE2.sms1.local (PROCESS_LOCAL) 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 17714 bytes in 0 ms 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 7 on executor 6: UCS-NODE1.sms1.local (PROCESS_LOCAL) 14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 17714 bytes in 1 ms 14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@UCS-NODE1.sms1.local:54238 14/09/16 20:14:28 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 184 bytes 14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 0 to spark@UCS-NODE2.sms1.local:43725 Thanks Oleg. On Wed, Sep 10, 2014 at 1:48 AM, Davies Liu <dav...@databricks.com> wrote: > On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets <oruchov...@gmail.com> > wrote: > > Hi , > > > > I came from map/reduce background and try to do quite trivial thing: > > > > I have a lot of files ( on hdfs ) - format is : > > > > 1 , 2 , 3 > > 2 , 3 , 5 > > 1 , 3, 5 > > 2, 3 , 4 > > 2 , 5, 1 > > > > I am actually need to group by key (first column) : > > key values > > 1 --> (2,3),(3,5) > > 2 --> (3,5),(3,4),(5,1) > > > > and I need to process (pass) values to the function f ( my custom > > function) > > outcome of function f() should be to hdfs with corresponding key: > > 1 --> f() outcome > > 2 --> f() outcome. > > > > My code is : > > > > def doSplit(x): > > y = x.split(',') > > if(len(y)==3): > > return y[0],(y[1],y[2]) > > > > > > lines = sc.textFile(filename,1) > > counts = lines.map(doSplit).groupByKey() > > output = counts.collect() > > > > for (key, value) in output: > > print 'build model for key ->' , key > > print value > > f(str(key) , value)) > > > > > > Questions: > > 1) lines.map(doSplit).groupByKey() - I didn't find the option to use > > groupByKey( f() ) to process grouped values? how can I process grouped > keys > > by custom function? function f has some not trivial logic. > > The result of groupByKey() is still RDD with (key, ResultIterable(values)), > so you can continue to call map() or mapValues() on it: > > lines.map(doSplit).groupByKey().map(f) > > But your `f` need two parameters, the map() will assume that `f` > take one parameter, so you need to build a wrapper for `f`: > > lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs)) > > If the `f` only accept values as list, then you need to convert `vs` into > list: > > result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, > list(vs))) > > finally, you could save the `result` into HDFS: > > result.saveAsPickleFile(path, batch=1024) > > > 2) Using output ( I really don't like this approach ) to pass to > > function looks like not scalable and executed only on one machine? What > is > > the way using PySpark process grouped keys in distributed fashion. > > Multiprocessing and on different machine of the cluster. > > > > 3)In case of processing output how data can be stored on hdfs? > > Currently, it's not easy to access files in HDFS, you could do it by > > sc.parallelize(local_data).map(str).saveAsTextFile() > > > Thanks > > Oleg. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >