I am expand my data set and executing pyspark on yarn:
   I payed attention that only 2 processes processed the data:

14210 yarn      20   0 2463m 2.0g 9708 R 100.0  4.3   8:22.63 python2.7
32467 yarn      20   0 2519m 2.1g 9720 R 99.3  4.4   7:16.97   python2.7


*Question:*
   *how to configure pyspark to have more processes for  process the data?*


Here is my command :

      [hdfs@UCS-MASTER cad]$
/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
--master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
--py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
/input/tad/data.csv /output/cad_model_500_1

I tried to play with num-executors and executor-cores but it is still 2
python processes doing the job. I have 5 machine cluster with 32 GB ram.

console output:


14/09/16 20:07:34 INFO spark.SecurityManager: Changing view acls to: hdfs
14/09/16 20:07:34 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hdfs)
14/09/16 20:07:34 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/16 20:07:35 INFO Remoting: Starting remoting
14/09/16 20:07:35 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@UCS-MASTER.sms1.local:39379]
14/09/16 20:07:35 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@UCS-MASTER.sms1.local:39379]
14/09/16 20:07:35 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/16 20:07:35 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/16 20:07:35 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140916200735-53f6
14/09/16 20:07:35 INFO storage.MemoryStore: MemoryStore started with
capacity 2.3 GB.
14/09/16 20:07:35 INFO network.ConnectionManager: Bound socket to port
37255 with id = ConnectionManagerId(UCS-MASTER.sms1.local,37255)
14/09/16 20:07:35 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/09/16 20:07:35 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:37255 with 2.3 GB RAM
14/09/16 20:07:35 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:55286
14/09/16 20:07:35 INFO broadcast.HttpBroadcast: Broadcast server started at
http://10.193.218.2:55286
14/09/16 20:07:35 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-ca8193be-9148-4e7e-a0cc-4b6e7cb72172
14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:38065
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
14/09/16 20:07:35 INFO ui.SparkUI: Started SparkUI at
http://UCS-MASTER.sms1.local:4040
14/09/16 20:07:36 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
--args is deprecated. Use --arg instead.
14/09/16 20:07:36 INFO impl.TimelineClientImpl: Timeline service address:
http://UCS-NODE1.sms1.local:8188/ws/v1/timeline/
14/09/16 20:07:37 INFO client.RMProxy: Connecting to ResourceManager at
UCS-NODE1.sms1.local/10.193.218.3:8050
14/09/16 20:07:37 INFO yarn.Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 5
14/09/16 20:07:37 INFO yarn.Client: Queue info ... queueName: default,
queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
      queueApplicationCount = 0, queueChildQueueCount = 0
14/09/16 20:07:37 INFO yarn.Client: Max mem capabililty of a single
resource in this cluster 53248
14/09/16 20:07:37 INFO yarn.Client: Preparing Local resources
14/09/16 20:07:37 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
14/09/16 20:07:37 INFO yarn.Client: Uploading
file:/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
14/09/16 20:07:39 INFO yarn.Client: Uploading
file:/usr/lib/cad/PrepareDataSetYarn.py to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/PrepareDataSetYarn.py
14/09/16 20:07:39 INFO yarn.Client: Uploading file:/usr/lib/cad/tad.zip to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/tad.zip
14/09/16 20:07:39 INFO yarn.Client: Setting up the launch environment
14/09/16 20:07:39 INFO yarn.Client: Setting up container launch context
14/09/16 20:07:39 INFO yarn.Client: Command for starting the Spark
ApplicationMaster: List($JAVA_HOME/bin/java, -server, -Xmx4096m,
-Djava.io.tmpdir=$PWD/tmp,
-Dspark.tachyonStore.folderName=\"spark-7543138b-c763-4a6b-abff-81b8dd1bd639\",
-Dspark.executor.memory=\"2g\", -Dspark.executor.instances=\"12\",
-Dspark.yarn.dist.files=\"file:/usr/lib/cad/PrepareDataSetYarn.py,file:/usr/lib/cad/tad.zip\",
-Dspark.yarn.secondary.jars=\"\",
-Dspark.submit.pyFiles=\"/usr/lib/cad/tad.zip\",
-Dspark.driver.host=\"UCS-MASTER.sms1.local\", -Dspark.app.name=\"CAD\",
-Dspark.fileserver.uri=\"http://10.193.218.2:38065\";,
-Dspark.master=\"yarn-client\", -Dspark.driver.port=\"39379\",
-Dspark.executor.cores=\"4\", -Dspark.httpBroadcast.uri=\"
http://10.193.218.2:55286\";,
 -Dlog4j.configuration=log4j-spark-container.properties,
org.apache.spark.deploy.yarn.ExecutorLauncher, --class, notused, --jar ,
null,  --args  'UCS-MASTER.sms1.local:39379' , --executor-memory, 2048,
--executor-cores, 4, --num-executors , 12, 1>, <LOG_DIR>/stdout, 2>,
<LOG_DIR>/stderr)
14/09/16 20:07:39 INFO yarn.Client: Submitting application to ASM
14/09/16 20:07:39 INFO impl.YarnClientImpl: Submitted application
application_1409564765875_0046
14/09/16 20:07:39 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:40 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:41 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:42 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: -1
 appStartTime: 1410869259760
 yarnAppState: ACCEPTED

14/09/16 20:07:43 INFO cluster.YarnClientSchedulerBackend: Application
report from ASM:
 appMasterRpcPort: 0
 appStartTime: 1410869259760
 yarnAppState: RUNNING

14/09/16 20:07:45 INFO cluster.YarnClientClusterScheduler:
YarnClientClusterScheduler.postStartHook done
14/09/16 20:07:45 INFO storage.MemoryStore: ensureFreeSpace(85685) called
with curMem=0, maxMem=2470025625
14/09/16 20:07:45 INFO storage.MemoryStore: Block broadcast_0 stored as
values to memory (estimated size 83.7 KB, free 2.3 GB)
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.tip.id is
deprecated. Instead, use mapreduce.task.id
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.id is
deprecated. Instead, use mapreduce.task.attempt.id
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.is.map is
deprecated. Instead, use mapreduce.task.ismap
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.task.partition is
deprecated. Instead, use mapreduce.task.partition
14/09/16 20:07:46 INFO Configuration.deprecation: mapred.job.id is
deprecated. Instead, use mapreduce.job.id
14/09/16 20:07:46 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:38745/user/Executor#-700940149]
with ID 2
14/09/16 20:07:46 INFO spark.SparkContext: Starting job: saveAsTextFile at
NativeMethodAccessorImpl.java:-2
14/09/16 20:07:46 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE1.sms1.local:39103 with 1178.1 MB RAM
14/09/16 20:07:46 INFO mapred.FileInputFormat: Total input paths to process
: 1
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Registering RDD 3 (RDD at
PythonRDD.scala:252)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
NativeMethodAccessorImpl.java:-2) with 2 output partitions
(allowLocal=false)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Final stage: Stage
0(saveAsTextFile at NativeMethodAccessorImpl.java:-2)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Parents of final stage:
List(Stage 1)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Missing parents: List(Stage
1)
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Submitting Stage 1
(PairwiseRDD[3] at RDD at PythonRDD.scala:252), which has no missing parents
14/09/16 20:07:46 INFO scheduler.DAGScheduler: Submitting 3 missing tasks
from Stage 1 (PairwiseRDD[3] at RDD at PythonRDD.scala:252)
14/09/16 20:07:46 INFO cluster.YarnClientClusterScheduler: Adding task set
1.0 with 3 tasks
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
0 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
3895 bytes in 2 ms
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID
1 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as
3895 bytes in 0 ms
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Starting task 1.0:2 as TID
2 on executor 2: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:07:46 INFO scheduler.TaskSetManager: Serialized task 1.0:2 as
3895 bytes in 0 ms
14/09/16 20:07:46 INFO util.RackResolver: Resolved UCS-NODE1.sms1.local to
/default-rack
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE2.sms1.local:40219/user/Executor#36134446]
with ID 3
14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE2.sms1.local to
/default-rack
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE2.sms1.local:37085/user/Executor#-199925854]
with ID 8
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:50805/user/Executor#-1155334234]
with ID 6
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE2.sms1.local:56453 with 1178.1 MB RAM
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE3.sms1.local:41585/user/Executor#2145905321]
with ID 5
14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE3.sms1.local to
/default-rack
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE2.sms1.local:37911 with 1178.1 MB RAM
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE1.sms1.local:50858 with 1178.1 MB RAM
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE3.sms1.local:59208/user/Executor#-1289679344]
with ID 10
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE3.sms1.local:40785 with 1178.1 MB RAM
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:59378/user/Executor#-1053652401]
with ID 4
14/09/16 20:07:47 INFO util.RackResolver: Resolved UCS-NODE4.sms1.local to
/default-rack
14/09/16 20:07:47 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:57503/user/Executor#-1979237887]
with ID 9
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE3.sms1.local:37476 with 1178.1 MB RAM
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE4.sms1.local:52439 with 1178.1 MB RAM
14/09/16 20:07:47 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE4.sms1.local:36292 with 1178.1 MB RAM
14/09/16 20:07:48 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE1.sms1.local:41792/user/Executor#-69338597]
with ID 12
14/09/16 20:07:48 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE1.sms1.local:59539 with 1178.1 MB RAM
14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:60623/user/Executor#361407816]
with ID 11
14/09/16 20:07:49 INFO util.RackResolver: Resolved UCS-MASTER.sms1.local to
/default-rack
14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:44955/user/Executor#389344699]
with ID 1
14/09/16 20:07:49 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-MASTER.sms1.local:54092/user/Executor#-1381396123]
with ID 7
14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:52854 with 1178.1 MB RAM
14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:50766 with 1178.1 MB RAM
14/09/16 20:07:49 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:55993 with 1178.1 MB RAM
14/09/16 20:08:58 INFO scheduler.TaskSetManager: Finished TID 2 in 71856 ms
on UCS-NODE1.sms1.local (progress: 1/3)
14/09/16 20:08:58 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
2)
14/09/16 20:11:37 INFO cluster.YarnClientSchedulerBackend: Executor 2
disconnected, so removing it
14/09/16 20:11:37 ERROR cluster.YarnClientClusterScheduler: Lost executor 2
on UCS-NODE1.sms1.local: remote Akka client disassociated
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Re-queueing tasks for 2
from TaskSet 1.0
14/09/16 20:11:37 INFO scheduler.DAGScheduler: Resubmitted
ShuffleMapTask(1, 2), so marking it as still running
14/09/16 20:11:37 WARN scheduler.TaskSetManager: Lost TID 1 (task 1.0:1)
14/09/16 20:11:37 WARN scheduler.TaskSetManager: Lost TID 0 (task 1.0:0)
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID
3 on executor 9: UCS-NODE4.sms1.local (NODE_LOCAL)
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as
3895 bytes in 0 ms
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID
4 on executor 5: UCS-NODE3.sms1.local (NODE_LOCAL)
14/09/16 20:11:37 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 0)
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as
3895 bytes in 0 ms
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Starting task 1.0:2 as TID
5 on executor 1: UCS-MASTER.sms1.local (NODE_LOCAL)
14/09/16 20:11:37 INFO storage.BlockManagerMasterActor: Trying to remove
executor 2 from BlockManagerMaster.
14/09/16 20:11:37 INFO scheduler.TaskSetManager: Serialized task 1.0:2 as
3895 bytes in 0 ms
14/09/16 20:11:37 INFO storage.BlockManagerMaster: Removed 2 successfully
in removeExecutor
14/09/16 20:11:37 INFO scheduler.Stage: Stage 1 is now unavailable on
executor 2 (0/3, false)
14/09/16 20:11:53 INFO cluster.YarnClientSchedulerBackend: Registered
executor: 
Actor[akka.tcp://sparkExecutor@UCS-NODE4.sms1.local:47948/user/Executor#-1547490738]
with ID 13
14/09/16 20:11:53 INFO storage.BlockManagerInfo: Registering block manager
UCS-NODE4.sms1.local:51174 with 1178.1 MB RAM
14/09/16 20:12:19 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
2)
14/09/16 20:12:19 INFO scheduler.TaskSetManager: Finished TID 5 in 41426 ms
on UCS-MASTER.sms1.local (progress: 1/3)
14/09/16 20:14:23 INFO scheduler.TaskSetManager: Finished TID 3 in 165752
ms on UCS-NODE4.sms1.local (progress: 2/3)
14/09/16 20:14:23 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
1)
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(1,
0)
14/09/16 20:14:27 INFO scheduler.TaskSetManager: Finished TID 4 in 170168
ms on UCS-NODE3.sms1.local (progress: 3/3)
14/09/16 20:14:27 INFO cluster.YarnClientClusterScheduler: Removed TaskSet
1.0, whose tasks have all completed, from pool
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Stage 1 (RDD at
PythonRDD.scala:252) finished in 401.305 s
14/09/16 20:14:27 INFO scheduler.DAGScheduler: looking for newly runnable
stages
14/09/16 20:14:27 INFO scheduler.DAGScheduler: running: Set()
14/09/16 20:14:27 INFO scheduler.DAGScheduler: waiting: Set(Stage 0)
14/09/16 20:14:27 INFO scheduler.DAGScheduler: failed: Set()
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Missing parents for Stage 0:
List()
14/09/16 20:14:27 INFO scheduler.DAGScheduler: Submitting Stage 0
(MappedRDD[8] at saveAsTextFile at NativeMethodAccessorImpl.java:-2), which
is now runnable
14/09/16 20:14:28 INFO scheduler.DAGScheduler: Submitting 2 missing tasks
from Stage 0 (MappedRDD[8] at saveAsTextFile at
NativeMethodAccessorImpl.java:-2)
14/09/16 20:14:28 INFO cluster.YarnClientClusterScheduler: Adding task set
0.0 with 2 tasks
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID
6 on executor 8: UCS-NODE2.sms1.local (PROCESS_LOCAL)
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as
17714 bytes in 0 ms
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID
7 on executor 6: UCS-NODE1.sms1.local (PROCESS_LOCAL)
14/09/16 20:14:28 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as
17714 bytes in 1 ms
14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@UCS-NODE1.sms1.local:54238
14/09/16 20:14:28 INFO spark.MapOutputTrackerMaster: Size of output
statuses for shuffle 0 is 184 bytes
14/09/16 20:14:28 INFO spark.MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 0 to spark@UCS-NODE2.sms1.local:43725

Thanks
Oleg.

On Wed, Sep 10, 2014 at 1:48 AM, Davies Liu <dav...@databricks.com> wrote:

> On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets <oruchov...@gmail.com>
> wrote:
> > Hi ,
> >
> >    I came from map/reduce background and try to do quite trivial thing:
> >
> > I have a lot of files ( on hdfs ) - format is :
> >
> >    1 , 2 , 3
> >    2 , 3 , 5
> >    1 , 3,  5
> >     2, 3 , 4
> >     2 , 5, 1
> >
> >   I am actually need to group by key (first column) :
> >   key   values
> >   1 --> (2,3),(3,5)
> >   2 --> (3,5),(3,4),(5,1)
> >
> >   and I need to process (pass)  values to the function f ( my custom
> > function)
> >   outcome of  function f()  should be  to hdfs with corresponding key:
> >     1 --> f() outcome
> >     2 --> f() outcome.
> >
> > My code is :
> >
> >       def doSplit(x):
> >         y = x.split(',')
> >         if(len(y)==3):
> >            return  y[0],(y[1],y[2])
> >
> >
> >     lines = sc.textFile(filename,1)
> >     counts = lines.map(doSplit).groupByKey()
> >     output = counts.collect()
> >
> >     for (key, value) in output:
> >         print 'build model for key ->' , key
> >         print value
> >         f(str(key) , value))
> >
> >
> > Questions:
> >    1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
> > groupByKey( f() ) to process grouped values? how can I process grouped
> keys
> > by custom function? function f has some not trivial logic.
>
> The result of groupByKey() is still RDD with (key, ResultIterable(values)),
> so you can continue to call map() or mapValues() on it:
>
> lines.map(doSplit).groupByKey().map(f)
>
> But your `f` need two parameters, the map() will assume that `f`
> take one parameter, so you need to build a wrapper for `f`:
>
> lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs))
>
> If the `f` only accept values as list, then you need to convert `vs` into
> list:
>
> result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k,
> list(vs)))
>
> finally, you could save the `result` into HDFS:
>
> result.saveAsPickleFile(path, batch=1024)
>
> >     2) Using output ( I really don't like this approach )  to pass to
> > function looks like not scalable and executed only on one machine?  What
> is
> > the way using PySpark process grouped keys in distributed fashion.
> > Multiprocessing and on different machine of the cluster.
> >
> > 3)In case of  processing output how data can be stored on hdfs?
>
> Currently, it's not easy to access files in HDFS, you could do it by
>
> sc.parallelize(local_data).map(str).saveAsTextFile()
>
> > Thanks
> > Oleg.
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>

Reply via email to