Re: PySpark on Yarn - how group by data properly

2014-09-16 Thread Oleg Ruchovets
I am expand my data set and executing pyspark on yarn:
   I payed attention that only 2 processes processed the data:

14210 yarn  20   0 2463m 2.0g 9708 R 100.0  4.3   8:22.63 python2.7
32467 yarn  20   0 2519m 2.1g 9720 R 99.3  4.4   7:16.97   python2.7


*Question:*
   *how to configure pyspark to have more processes for  process the data?*


Here is my command :

  [hdfs@UCS-MASTER cad]$
/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/bin/spark-submit
--master yarn  --num-executors 12  --driver-memory 4g --executor-memory 2g
--py-files tad.zip --executor-cores 4   /usr/lib/cad/PrepareDataSetYarn.py
/input/tad/data.csv /output/cad_model_500_1

I tried to play with num-executors and executor-cores but it is still 2
python processes doing the job. I have 5 machine cluster with 32 GB ram.

console output:


14/09/16 20:07:34 INFO spark.SecurityManager: Changing view acls to: hdfs
14/09/16 20:07:34 INFO spark.SecurityManager: SecurityManager:
authentication disabled; ui acls disabled; users with view permissions:
Set(hdfs)
14/09/16 20:07:34 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/16 20:07:35 INFO Remoting: Starting remoting
14/09/16 20:07:35 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@UCS-MASTER.sms1.local:39379]
14/09/16 20:07:35 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@UCS-MASTER.sms1.local:39379]
14/09/16 20:07:35 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/16 20:07:35 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/16 20:07:35 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140916200735-53f6
14/09/16 20:07:35 INFO storage.MemoryStore: MemoryStore started with
capacity 2.3 GB.
14/09/16 20:07:35 INFO network.ConnectionManager: Bound socket to port
37255 with id = ConnectionManagerId(UCS-MASTER.sms1.local,37255)
14/09/16 20:07:35 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/09/16 20:07:35 INFO storage.BlockManagerInfo: Registering block manager
UCS-MASTER.sms1.local:37255 with 2.3 GB RAM
14/09/16 20:07:35 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:55286
14/09/16 20:07:35 INFO broadcast.HttpBroadcast: Broadcast server started at
http://10.193.218.2:55286
14/09/16 20:07:35 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-ca8193be-9148-4e7e-a0cc-4b6e7cb72172
14/09/16 20:07:35 INFO spark.HttpServer: Starting HTTP Server
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:38065
14/09/16 20:07:35 INFO server.Server: jetty-8.y.z-SNAPSHOT
14/09/16 20:07:35 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
14/09/16 20:07:35 INFO ui.SparkUI: Started SparkUI at
http://UCS-MASTER.sms1.local:4040
14/09/16 20:07:36 WARN util.NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
--args is deprecated. Use --arg instead.
14/09/16 20:07:36 INFO impl.TimelineClientImpl: Timeline service address:
http://UCS-NODE1.sms1.local:8188/ws/v1/timeline/
14/09/16 20:07:37 INFO client.RMProxy: Connecting to ResourceManager at
UCS-NODE1.sms1.local/10.193.218.3:8050
14/09/16 20:07:37 INFO yarn.Client: Got Cluster metric info from
ApplicationsManager (ASM), number of NodeManagers: 5
14/09/16 20:07:37 INFO yarn.Client: Queue info ... queueName: default,
queueCurrentCapacity: 0.0, queueMaxCapacity: 1.0,
  queueApplicationCount = 0, queueChildQueueCount = 0
14/09/16 20:07:37 INFO yarn.Client: Max mem capabililty of a single
resource in this cluster 53248
14/09/16 20:07:37 INFO yarn.Client: Preparing Local resources
14/09/16 20:07:37 WARN hdfs.BlockReaderLocal: The short-circuit local reads
feature cannot be used because libhadoop cannot be loaded.
14/09/16 20:07:37 INFO yarn.Client: Uploading
file:/usr/lib/spark-1.0.1.2.1.3.0-563-bin-2.4.0.2.1.3.0-563/lib/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/spark-assembly-1.0.1.2.1.3.0-563-hadoop2.4.0.2.1.3.0-563.jar
14/09/16 20:07:39 INFO yarn.Client: Uploading
file:/usr/lib/cad/PrepareDataSetYarn.py to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/PrepareDataSetYarn.py
14/09/16 20:07:39 INFO yarn.Client: Uploading file:/usr/lib/cad/tad.zip to
hdfs://UCS-MASTER.sms1.local:8020/user/hdfs/.sparkStaging/application_1409564765875_0046/tad.zip
14/09/16 20:07:39 INFO yarn.Client: Setting up the launch environment
14/09/16 20:07:39 INFO yarn.Client: Setting up container launch context
14/09/16 20:07:39 INFO yarn.Client: Command for starting the Spark
ApplicationMaster: List($JAVA_HOME/bin/j

Re: PySpark on Yarn - how group by data properly

2014-09-09 Thread Davies Liu
On Tue, Sep 9, 2014 at 9:56 AM, Oleg Ruchovets  wrote:
> Hi ,
>
>I came from map/reduce background and try to do quite trivial thing:
>
> I have a lot of files ( on hdfs ) - format is :
>
>1 , 2 , 3
>2 , 3 , 5
>1 , 3,  5
> 2, 3 , 4
> 2 , 5, 1
>
>   I am actually need to group by key (first column) :
>   key   values
>   1 --> (2,3),(3,5)
>   2 --> (3,5),(3,4),(5,1)
>
>   and I need to process (pass)  values to the function f ( my custom
> function)
>   outcome of  function f()  should be  to hdfs with corresponding key:
> 1 --> f() outcome
> 2 --> f() outcome.
>
> My code is :
>
>   def doSplit(x):
> y = x.split(',')
> if(len(y)==3):
>return  y[0],(y[1],y[2])
>
>
> lines = sc.textFile(filename,1)
> counts = lines.map(doSplit).groupByKey()
> output = counts.collect()
>
> for (key, value) in output:
> print 'build model for key ->' , key
> print value
> f(str(key) , value))
>
>
> Questions:
>1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
> groupByKey( f() ) to process grouped values? how can I process grouped keys
> by custom function? function f has some not trivial logic.

The result of groupByKey() is still RDD with (key, ResultIterable(values)),
so you can continue to call map() or mapValues() on it:

lines.map(doSplit).groupByKey().map(f)

But your `f` need two parameters, the map() will assume that `f`
take one parameter, so you need to build a wrapper for `f`:

lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, vs))

If the `f` only accept values as list, then you need to convert `vs` into list:

result = lines.map(doSplit).groupByKey().map(lambda (k, vs): f(k, list(vs)))

finally, you could save the `result` into HDFS:

result.saveAsPickleFile(path, batch=1024)

> 2) Using output ( I really don't like this approach )  to pass to
> function looks like not scalable and executed only on one machine?  What is
> the way using PySpark process grouped keys in distributed fashion.
> Multiprocessing and on different machine of the cluster.
>
> 3)In case of  processing output how data can be stored on hdfs?

Currently, it's not easy to access files in HDFS, you could do it by

sc.parallelize(local_data).map(str).saveAsTextFile()

> Thanks
> Oleg.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark on Yarn - how group by data properly

2014-09-09 Thread Oleg Ruchovets
Hi ,

   I came from map/reduce background and try to do quite trivial thing:

I have a lot of files ( on hdfs ) - format is :

   1 , 2 , 3
   2 , 3 , 5
   1 , 3,  5
2, 3 , 4
2 , 5, 1

  I am actually need to group by key (first column) :
  key   values
  1 --> (2,3),(3,5)
  2 --> (3,5),(3,4),(5,1)

  and I need to process (pass)  values to the function f ( my custom
function)
  outcome of  function f()  should be  to hdfs with corresponding key:
1 --> f() outcome
2 --> f() outcome.

My code is :

  def doSplit(x):
y = x.split(',')
if(len(y)==3):
   return  y[0],(y[1],y[2])


lines = sc.textFile(filename,1)
counts = lines.map(doSplit).groupByKey()
output = counts.collect()

for (key, value) in output:
print 'build model for key ->' , key
print value
f(str(key) , value))


Questions:
   1) lines.map(doSplit).groupByKey() - I didn't  find the option to use
groupByKey( f() ) to process grouped values? how can I process grouped keys
by custom function? function f has some not trivial logic.

2) Using output ( I really don't like this approach )  to pass to
function looks like not scalable and executed only on one machine?  What is
the way using PySpark process grouped keys in distributed fashion.
Multiprocessing and on different machine of the cluster.

3)In case of  processing output how data can be stored on hdfs?

Thanks
Oleg.