Re: Spark performance testing

2016-07-08 Thread charles li
Hi, Andrew, I've got lots of materials when asking google for "*spark
performance test*"


   - https://github.com/databricks/spark-perf
   -
   
https://spark-summit.org/2014/wp-content/uploads/2014/06/Testing-Spark-Best-Practices-Anupama-Shetty-Neil-Marshall.pdf
   - http://people.cs.vt.edu/~butta/docs/tpctc2015-sparkbench.pdf



On Sat, Jul 9, 2016 at 11:40 AM, Andrew Ehrlich  wrote:

> Hi group,
>
> What solutions are people using to do performance testing and tuning of
> spark applications? I have been doing a pretty manual technique where I lay
> out an Excel sheet of various memory settings and caching parameters and
> then execute each one by hand. It’s pretty tedious though, so I’m wondering
> what others do, and if you do performance testing at all.  Also, is anyone
> generating test data, or just operating on a static set? Is regression
> testing for performance a thing?
>
> Andrew
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
*___*
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io
*github*: www.github.com/litaotao


Is there a way to dynamic load files [ parquet or csv ] in the map function?

2016-07-08 Thread charles li
hi, guys, is there a way to dynamic load files within the map function.

i.e.

Can I code as bellow:


​

thanks a lot.
​


-- 
*___*
​  ​
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io
*github*: www.github.com/litaotao


Preview release of Spark 2.0

2016-05-29 Thread charles li
Here is the link: http://spark.apache.org/news/spark-2.0.0-preview.html

congrats, haha, looking forward to 2.0.1, awesome project.


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


question about Reynold's talk: " The Future of Real Time"

2016-04-22 Thread charles li
hi, there, the talk  *The Future of Real Time in Spark* here
https://www.youtube.com/watch?v=oXkxXDG0gNk  tells that there will be "BI
app integration" on 24:28 of the video.

what does he mean the *BI app integration* in that talk? does that mean
that they will develop a BI tool like zeppelin, hue or yhat that integrates
in Spark 2.1?

thanks,


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: confusing about Spark SQL json format

2016-03-31 Thread charles li
hi, UMESH, I think you've misunderstood the json definition.

there is only one object in a json file:


for the file, people.json, as bellow:



{"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}

---

it does have two valid format:

1.



[ {"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}},
{"name":"Michael", "address":{"city":null, "state":"California"}}
]

---

2.



{"name": ["Yin", "Michael"],
"address":[ {"city":"Columbus","state":"Ohio"},
{"city":null, "state":"California"} ]
}
---



On Thu, Mar 31, 2016 at 4:53 PM, UMESH CHAUDHARY 
wrote:

> Hi,
> Look at below image which is from json.org :
>
> [image: Inline image 1]
>
> The above image describes the object formulation of below JSON:
>
> Object 1=> {"name":"Yin", "address":{"city":"Columbus","state":"Ohio"}}
> Object=> {"name":"Michael", "address":{"city":null, "state":"California"}}
>
>
> Note that "address" is also an object.
>
>
>
> On Thu, Mar 31, 2016 at 1:53 PM, charles li 
> wrote:
>
>> as this post  says, that in spark, we can load a json file in this way
>> bellow:
>>
>> *post* :
>> https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html
>>
>>
>>
>> ---
>> sqlContext.jsonFile(file_path)
>> or
>> sqlContext.read.json(file_path)
>>
>> ---
>>
>>
>> and the *json file format* looks like bellow, say *people.json*
>>
>>
>> {"name":"Yin",
>> "address":{"city":"Columbus","state":"Ohio"}}
>> {"name":"Michael", "address":{"city":null, "state":"California"}}
>>
>> ---
>>
>>
>> and here comes my *problems*:
>>
>> Is that the *standard json format*? according to http://www.json.org/ ,
>> I don't think so. it's just a *collection of records* [ a dict ], not a
>> valid json format. as the json official doc, the standard json format of
>> people.json should be :
>>
>>
>> {"name":
>> ["Yin", "Michael"],
>> "address":[ {"city":"Columbus","state":"Ohio"},
>> {"city":null, "state":"California"} ]
>> }
>>
>> ---
>>
>> So, why we define the json format as a collection of records in spark, I
>> mean, it will lead to some unconvenient, for if we had a large standard
>> json file, we need to firstly format it to make it correctly readable in
>> spark, which will low-efficiency, time-consuming, un-compatible and
>> space-consuming.
>>
>>
>> great thanks,
>>
>>
>>
>>
>>
>>
>> --
>> *--*
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


confusing about Spark SQL json format

2016-03-31 Thread charles li
as this post  says, that in spark, we can load a json file in this way
bellow:

*post* :
https://databricks.com/blog/2015/02/02/an-introduction-to-json-support-in-spark-sql.html


---
sqlContext.jsonFile(file_path)
or
sqlContext.read.json(file_path)
---


and the *json file format* looks like bellow, say *people.json*

{"name":"Yin",
"address":{"city":"Columbus","state":"Ohio"}}
{"name":"Michael", "address":{"city":null, "state":"California"}}
---


and here comes my *problems*:

Is that the *standard json format*? according to http://www.json.org/ , I
don't think so. it's just a *collection of records* [ a dict ], not a valid
json format. as the json official doc, the standard json format of
people.json should be :

{"name":
["Yin", "Michael"],
"address":[ {"city":"Columbus","state":"Ohio"},
{"city":null, "state":"California"} ]
}
---

So, why we define the json format as a collection of records in spark, I
mean, it will lead to some unconvenient, for if we had a large standard
json file, we need to firstly format it to make it correctly readable in
spark, which will low-efficiency, time-consuming, un-compatible and
space-consuming.


great thanks,






-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread charles li
Hi, Pal, thanks a lot, this can indeed help me.

On Mon, Mar 28, 2016 at 10:44 PM, Sujit Pal  wrote:

> Hi Charles,
>
> I tried this with dummied out functions which just sum transformations of
> a list of integers, maybe they could be replaced by algorithms in your
> case. The idea is to call them through a "god" function that takes an
> additional type parameter and delegates out to the appropriate function.
> Here's my code, maybe it helps...
>
> def f0(xs):
>>   return len(xs)
>> def f1(xs):
>>   return sum(xs)
>> def f2(xs):
>>   return sum([x**2 for x in xs])
>> def f_god(n, xs):
>>   if n == 1:
>> return f1(xs)
>>   elif n == 2:
>> return f2(xs)
>>   else:
>> return f0(xs)
>>
>> xs = [x for x in range(0, 5)]
>> xs_b = sc.broadcast(xs)
>> ns = sc.parallelize([x for x in range(0, 3)])
>> results = ns.map(lambda n: f_god(n, xs_b.value))
>> print results.take(10)
>
>
> gives me:
>
> [5, 10, 30]
> -sujit
>
>
> On Mon, Mar 28, 2016 at 12:59 AM, Holden Karau 
> wrote:
>
>> You probably want to look at the map transformation, and the many more
>> defined on RDDs. The function you pass in to map is serialized and the
>> computation is distributed.
>>
>>
>> On Monday, March 28, 2016, charles li  wrote:
>>
>>>
>>> use case: have a dataset, and want to use different algorithms on that,
>>> and fetch the result.
>>>
>>> for making this, I think I should distribute my algorithms, and run
>>> these algorithms on the dataset at the same time, am I right?
>>>
>>> but it seems that spark can not parallelize/serialize
>>> algorithms/functions, then how to make it?
>>>
>>>
>>> *here is the test code*:
>>>
>>>
>>> 
>>> def test():
>>> pass
>>> function_list = [test] * 10
>>>
>>> sc.parallelize([test] * 10).take(1)
>>>
>>> 
>>>
>>>
>>> *error message: *
>>> Py4JJavaError: An error occurred while calling
>>> z:org.apache.spark.api.python.PythonRDD.runJob.
>>>
>>> : org.apache.spark.SparkException: Job aborted due to stage failure:
>>> Task 2 in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in
>>> stage 9.0 (TID 105, sh-demo-hadoop-07):
>>> org.apache.spark.api.python.PythonException: Traceback (most recent call
>>> last):
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>>> line 111, in main
>>>
>>> process()
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
>>> line 106, in process
>>>
>>> serializer.dump_stream(func(split_index, iterator), outfile)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 263, in dump_stream
>>>
>>> vs = list(itertools.islice(iterator, batch))
>>>
>>>   File
>>> "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py", line
>>> 1293, in takeUpToNumLeft
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 139, in load_stream
>>>
>>> yield self._read_with_length(stream)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 164, in _read_with_length
>>>
>>> return self.loads(obj)
>>>
>>>   File
>>> "/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
>>> line 422, in loads
>>>
>>> return pickle.loads(obj)
>>>
>>> AttributeError: 'module' object has no attribute 'test'
>>>
>>>
>>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
>>>
>>> at
>>> org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)
>>>
>>> at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
>>>
>>> at org.apache.spark.api.python.PythonRDD.comp

since spark can not parallelize/serialize functions, how to distribute algorithms on the same data?

2016-03-28 Thread charles li
use case: have a dataset, and want to use different algorithms on that, and
fetch the result.

for making this, I think I should distribute my algorithms, and run these
algorithms on the dataset at the same time, am I right?

but it seems that spark can not parallelize/serialize algorithms/functions,
then how to make it?


*here is the test code*:


def test():
pass
function_list = [test] * 10

sc.parallelize([test] * 10).take(1)



*error message: *
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.runJob.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 9.0 failed 4 times, most recent failure: Lost task 2.3 in stage
9.0 (TID 105, sh-demo-hadoop-07):
org.apache.spark.api.python.PythonException: Traceback (most recent call
last):

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
line 111, in main

process()

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/worker.py",
line 106, in process

serializer.dump_stream(func(split_index, iterator), outfile)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 263, in dump_stream

vs = list(itertools.islice(iterator, batch))

  File "/datayes/spark_process/spark-1.6.0-bin-cdh4/python/pyspark/rdd.py",
line 1293, in takeUpToNumLeft

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 139, in load_stream

yield self._read_with_length(stream)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 164, in _read_with_length

return self.loads(obj)

  File
"/usr/local/spark-1.6.0-bin-cdh4/python/lib/pyspark.zip/pyspark/serializers.py",
line 422, in loads

return pickle.loads(obj)

AttributeError: 'module' object has no attribute 'test'


at
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)

at
org.apache.spark.api.python.PythonRunner$$anon$1.(PythonRDD.scala:207)

at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)

at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)

at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)

at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

at org.apache.spark.scheduler.Task.run(Task.scala:89)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)


what's interesting is that* when I run sc.parallelize([test] *
10).collect() , it works fine*, returns :

[,

 ,

 ,

 ,

 ,

 ,

 ,

 ,

 ,

 ]




-- 
--
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: what happened if cache a RDD for multiple time?

2016-03-24 Thread charles li
hi, yash, that's really help me, great thanks

On Thu, Mar 24, 2016 at 7:07 PM, yash datta  wrote:

> Yes, That is correct.
>
> When you call cache on an RDD, internally it calls
> persist(StorageLevel.MEMORY_ONLY) which further calls
>
> persist(StorageLevel.MEMORY_ONLY, allowOverride=false) , if the RDD is not
> marked for localCheckpointing
>
> Below is what is finally triggered :
>
> /**
>  * Mark this RDD for persisting using the specified level.
>  *
>  * @param newLevel the target storage level
>  * @param allowOverride whether to override any existing level with the new 
> one
>  */
> private def persist(newLevel: StorageLevel, allowOverride: Boolean): 
> this.type = {
>   // TODO: Handle changes of StorageLevel
>   if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && 
> !allowOverride) {
> throw new UnsupportedOperationException(
>   "Cannot change storage level of an RDD after it was already assigned a 
> level")
>   }
>   // If this is the first time this RDD is marked for persisting, register it
>   // with the SparkContext for cleanups and accounting. Do this only once.
>   if (storageLevel == StorageLevel.NONE) {
> sc.cleaner.foreach(_.registerRDDForCleanup(this))
> sc.persistRDD(this)
>   }
>   storageLevel = newLevel
>   this
> }
>
> As is clear from the code, persistRDD is called only when storageLevel for
> the RDD was never set (So it will be called only once for multiple calls
> for the same RDD).
> Also, persistRDD only sets an entry in persistentRdds map, which is keyed
> by RDD id :
>
> /**
>  * Register an RDD to be persisted in memory and/or disk storage
>  */
> private[spark] def persistRDD(rdd: RDD[_]) {
>   persistentRdds(rdd.id) = rdd
> }
>
> Hope this helps.
>
> Best
> Yash
>
> On Thu, Mar 24, 2016 at 1:58 PM, charles li 
> wrote:
>
>>
>> happened to see this problem on stackoverflow:
>> http://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark/36195812#36195812
>>
>>
>> I think it's very interesting, and I think the answer posted by Aaron
>> sounds promising, but I'm not sure, and I don't find the details on the
>> cache principle in Spark, so just post here and to ask everyone that the
>> internal principle on implementing cache.
>>
>> great thanks.
>>
>>
>> -aaron's answer to that question [Is that right?]-
>>
>> nothing happens, it will just cache the RDD for once. The reason, I
>> think, is that every RDD has an id internally, spark will use the id to
>> mark whether a RDD have been cached or not. so cache one RDD for multiple
>> times will do nothing.
>> ---
>>
>>
>>
>> --
>> *--*
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>
>
> --
> When events unfold with calm and ease
> When the winds that blow are merely breeze
> Learn from nature, from birds and bees
> Live your life in love, and let joy not cease.
>



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


what happened if cache a RDD for multiple time?

2016-03-24 Thread charles li
happened to see this problem on stackoverflow:
http://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark/36195812#36195812


I think it's very interesting, and I think the answer posted by Aaron
sounds promising, but I'm not sure, and I don't find the details on the
cache principle in Spark, so just post here and to ask everyone that the
internal principle on implementing cache.

great thanks.


-aaron's answer to that question [Is that right?]-

nothing happens, it will just cache the RDD for once. The reason, I think,
is that every RDD has an id internally, spark will use the id to mark
whether a RDD have been cached or not. so cache one RDD for multiple times
will do nothing.
---



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: best practices: running multi user jupyter notebook server

2016-03-20 Thread charles li
Hi, andy, I think you can make that with some open source packages/libs
built for IPython and  Spark.

here is one : https://github.com/litaotao/IPython-Dashboard

On Thu, Mar 17, 2016 at 1:36 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> We are considering deploying a notebook server for use by two kinds of
> users
>
>
>1. interactive dashboard.
>   1. I.e. Forms allow users to select data sets and visualizations
>   2. Review real time graphs of data captured by our spark streams
>2. General notebooks for Data Scientists
>
>
> My concern is interactive spark jobs can can consume a lot of cluster
> resource and many users may be sloppy/lazy. I.E. Just kill their browsers
> instead of shutting down their notebooks cleanly
>
> What are best practices?
>
>
> Kind regards
>
> Andy
>



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: best way to do deep learning on spark ?

2016-03-19 Thread charles li
Hi, Alexander,

that's awesome, and when will that feature be released ? Since I want to
know the opportunity cost between waiting for that release and use caffe or
tensorFlow ?

great thanks again

On Thu, Mar 17, 2016 at 10:32 AM, Ulanov, Alexander <
alexander.ula...@hpe.com> wrote:

> Hi Charles,
>
>
>
> There is an implementation of multilayer perceptron in Spark (since 1.5):
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>
>
>
> Other features such as autoencoder, convolutional layers, etc. are
> currently under development. Please refer to
> https://issues.apache.org/jira/browse/SPARK-5575
>
>
>
> Best regards, Alexander
>
>
>
> *From:* charles li [mailto:charles.up...@gmail.com]
> *Sent:* Wednesday, March 16, 2016 7:01 PM
> *To:* user 
> *Subject:* best way to do deep learning on spark ?
>
>
>
>
>
> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
> that MLlib does not support deep learning, I want to know is there any way
> to implement deep learning on spark ?
>
>
>
> *Do I must use 3-party package like caffe or tensorflow ?*
>
>
>
> or
>
>
>
> *Does deep learning module list in the MLlib development plan?*
>
>
>
>
> great thanks
>
>
>
> --
>
> *--*
>
> a spark lover, a quant, a developer and a good man.
>
>
>
> http://github.com/litaotao
>



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


best way to do deep learning on spark ?

2016-03-19 Thread charles li
Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
that MLlib does not support deep learning, I want to know is there any way
to implement deep learning on spark ?

*Do I must use 3-party package like caffe or tensorflow ?*

or

*Does deep learning module list in the MLlib development plan?*


great thanks

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


the "DAG Visualiztion" in 1.6 not works fine here

2016-03-15 Thread charles li
sometimes it just shows several *black dots*, and sometimes it can not show
the entire graph.

did anyone meet this before and how did you fix it?




​
​


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


is there any way to make WEB UI auto-refresh?

2016-03-15 Thread charles li
every time I can only get the latest info by refreshing the page, that's a
little boring.

so is there any way to make the WEB UI auto-refreshing ?


great thanks



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: rdd cache name

2016-03-02 Thread charles li
thanks a lot, Xinh, that's very helpful for me.

On Thu, Mar 3, 2016 at 12:54 AM, Xinh Huynh  wrote:

> Hi Charles,
>
> You can set the RDD name before using it. Just do before caching:
> (Scala) myRdd.setName("Charles RDD")
> (Python) myRdd.setName('Charles RDD')
> Reference: PySpark doc:
> http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD
>
> Fraction cached is the percentage of partitions of an RDD that are cached.
> From the code:
> (rdd.numCachedPartitions * 100.0 / rdd.numPartitions)
> Code is here:
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala
> Fraction cached will be less than 100% if there isn't enough room for all
> cached RDDs to fit in the cache. If it's a problem, you may want to
> increase your in-memory cache size or cache off-heap or to disk.
>
> Xinh
>
> On Wed, Mar 2, 2016 at 1:48 AM, charles li 
> wrote:
>
>> hi, there, I feel a little confused about the *cache* in spark.
>>
>> first, is there any way to *customize the cached RDD name*, it's not
>> convenient for me when looking at the storage page, there are the kind of
>> RDD in the RDD Name column, I hope to make it as my customized name, kinds
>> of 'rdd 1', 'rrd of map', 'rdd of groupby' and so on.
>>
>> second, can some one tell me what exactly the '*Fraction Cached*' mean
>> under the hood?
>>
>> great thanks
>>
>>
>>
>> ​
>>
>> --
>> *--*
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


rdd cache name

2016-03-02 Thread charles li
hi, there, I feel a little confused about the *cache* in spark.

first, is there any way to *customize the cached RDD name*, it's not
convenient for me when looking at the storage page, there are the kind of
RDD in the RDD Name column, I hope to make it as my customized name, kinds
of 'rdd 1', 'rrd of map', 'rdd of groupby' and so on.

second, can some one tell me what exactly the '*Fraction Cached*' mean
under the hood?

great thanks



​

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: Recommendation for a good book on Spark, beginner to moderate knowledge

2016-02-29 Thread charles li
since spark is under actively developing, so take a book to learn it is
somehow outdated to some degree.

I would like to suggest learn it from several ways as bellow:


   - spark official document, trust me, you will go through this for
   several time if you want to learn in well : http://spark.apache.org/
   - spark summit, lots of videos and slide, high quality :
   https://spark-summit.org/
   - databricks' blog : https://databricks.com/blog
   - attend spark meetup : http://www.meetup.com/
   - try spark 3-party package if needed and convenient :
   http://spark-packages.org/
   - and I just start to blog my spark learning memo on my blog:
   http://litaotao.github.io


in a word, I think the best way to learn it is official *document +
databricks blog + others' blog ===>>> your blog [ tutorial by you or just
memo for your learning ]*

On Mon, Feb 29, 2016 at 4:50 PM, Ashok Kumar 
wrote:

> Thank you all for valuable advice. Much appreciated
>
> Best
>
>
> On Sunday, 28 February 2016, 21:48, Ashok Kumar 
> wrote:
>
>
>   Hi Gurus,
>
> Appreciate if you recommend me a good book on Spark or documentation for
> beginner to moderate knowledge
>
> I very much like to skill myself on transformation and action methods.
>
> FYI, I have already looked at examples on net. However, some of them not
> clear at least to me.
>
> Warmest regards
>
>
>


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


how to interview spark developers

2016-02-23 Thread charles li
hi, there, we are going to recruit several spark developers, can some one
give some ideas on interviewing candidates, say, spark related problems.


great thanks.

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


spark.executor.memory ? is used just for cache RDD or both cache RDD and the runtime of cores on worker?

2016-02-04 Thread charles li
if set spark.executor.memory = 2G for each worker [ 10 in total ]

does it mean I can cache 20G RDD in memory ? if so, how about the memory
for code running in each process on each worker?

thanks.


--
and is there any materials about memory management or resource management
in spark ? I want to put spark in production, but have little knowing about
the resource management in spark, great thanks again


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


rdd cache priority

2016-02-04 Thread charles li
say I have 2 RDDs, RDD1 and RDD2.

both are 20g in memory.

and I cache both of them in memory using RDD1.cache() and RDD2.cache()


the in the further steps on my app, I never use RDD1 but use RDD2 for lots
of time.


then here is my question:

if there is only 40G memory in my cluster, and here I have another RDD,
RDD3 for 20g, what happened if I cache RDD3 using RDD3.cache()?


as the document says, cache using the default cache level : MEMORY_ONLY .
it means that it will not definitely cache RDD3 but re-compute it every
time used.

I feel a little confused, will spark help me remove RDD1 and put RDD3 in
the memory?

or is there any concept like " Priority cache " in spark?


great thanks



-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


questions about progress bar status [stuck]?

2016-02-01 Thread charles li
code:

---
total = int(1e8)
local_collection = range(1, total)
rdd = sc.parallelize(local_collection)
res = rdd.collect()
---

web ui status
---

​
problems:
---

1. from the status bar, it seems that the there should be about half tasks
done, but it just say there is no tasks done in the total 16 tasks.

2. the task just stuck, I have to kill it manually, but I don't know why it
stuck? any idea about this problem?

3. I tried to set total as `1e6`,it  works fine.


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


how to introduce spark to your colleague if he has no background about *** spark related

2016-01-31 Thread charles li
*Apache Spark™* is a fast and general engine for large-scale data
processing.

it's a good profile of spark, but it's really too short for lots of people
if then have little background in this field.

ok, frankly, I'll give a tech-talk about spark later this week, and now I'm
writing a slide about that, but I'm stuck at the first slide.


I'm going to talk about three question about spark in the first part of my
talk, for most of my colleagues has no background on spark, hadoop, so I
want to talk :

1. the background of birth of spark
2. pros and cons of spark, or the situations that spark is going to handle,
or why we use spark
3. the basic principles of spark,
4. the basic conceptions of spark

have anyone met kinds of this problem, introduce spark to one who has no
background on your field? and I hope you can tell me how you handle this
problem at that time, or give some ideas about the 4 sections mentioned
above.


great thanks.


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


confusing about start ipython notebook with spark between 1.3.x and 1.6.x

2016-01-31 Thread charles li
I used to use spark 1.3.x before, and explore my data in ipython [3.2]
notebook, which was very stable. but I came across an error

 " Java gateway process exited before sending the driver its port number "

my code is as bellow:

```
import pyspark
from pyspark import SparkConf

sc_conf = SparkConf() ### error occurs here
```

then I ask google for help, here is a answer on stackoverflow:
http://stackoverflow.com/questions/30763951/spark-context-sc-not-defined/30851037#30851037
, it says:

```

One solution is adding pyspark-shell to the shell environment variable
PYSPARK_SUBMIT_ARGS:

export PYSPARK_SUBMIT_ARGS="--master local[2] pyspark-shell"

There is a change in python/pyspark/java_gateway.py
,
which requires PYSPARK_SUBMIT_ARGS includes pyspark-shell if a
PYSPARK_SUBMIT_ARGS variable is set by a user.

```

then I change my  PYSPARK_SUBMIT_ARGS from `--master spark://
10.21.208.21:7077 --deploy-mode client` to `--master spark://
10.21.208.21:7077 --deploy-mode client  pyspark-shell` , it does works, but
it raise another question, each time when I create sc in different
notebooks, the spark app name is `pyspark-shell` even though I explicitily
set the app name using SparkConf, that's really confused me these days.

then My questions come:



   - How to start ipython notebook with spark integrated in spark 1.6.0;
   - why it works when I set `pyspark-shell` in PYSPARK_SUBMIT_ARGS when
   start ipython notebook with spark 1.6.0;
   - why it does not work when I explicitly set the app name using
   SparkConf;


great thanks.

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


best practice : how to manage your Spark cluster ?

2016-01-20 Thread charles li
I've put a thread before:  pre-install 3-party Python package on spark
cluster

currently I use *Fabric* to manage my cluster , but it's not enough for me,
and I believe there is a much better way to *manage and monitor* the
cluster.

I believe there really exists some open source manage tools which provides
a web UI allowing me to [ what I need exactly ]:


   - monitor the cluster machine's state in real-time, say memory, network,
   disk
   - list all the services, packages on each machine
   - install / uninstall / upgrade / downgrade package through a web UI
   - start / stop / restart services on that machine



great thanks

-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: rdd.foreach return value

2016-01-18 Thread charles li
got it, great thanks, Vishal, Ted and David

On Tue, Jan 19, 2016 at 1:10 PM, Vishal Maru  wrote:

> 1. foreach doesn't expect any value from function being passed (in your
> func_foreach). so nothing happens. The return values are just lost. it's
> like calling a function without saving return value to another var.
> foreach also doesn't return anything so you don't get modified RDD (like
> map*).
> 2. RDD's are immutable. All transform functions (map*,groupBy*,reduceBy
> etc.) return new RDD.
> 3. Yes. It's just iterates through elements and calls the function being
> passed. That's it. It doesn't collect the values and don't return any new
> modified RDD.
>
>
> On Mon, Jan 18, 2016 at 11:10 PM, charles li 
> wrote:
>
>>
>> hi, great thanks to david and ted, I know that the content of RDD can be
>> returned to driver using 'collect' method.
>>
>> but my question is:
>>
>>
>> 1. cause we can write any code we like in the function put into
>> 'foreach', so what happened when we actually write a 'return' sentence in
>> the foreach function?
>> 2. as the photo shows bellow, the content of RDD doesn't change after
>> foreach function, why?
>> 3. I feel a little confused about the 'foreach' method, it should be an
>> 'action', right? cause it return nothing. or is there any best practice of
>> the 'foreach' funtion? or can some one put your code snippet when using
>> 'foreach' method in your application, that would be awesome.
>>
>>
>> great thanks again
>>
>>
>>
>> ​
>>
>> On Tue, Jan 19, 2016 at 11:44 AM, Ted Yu  wrote:
>>
>>> Here is signature for foreach:
>>>  def foreach(f: T => Unit): Unit = withScope {
>>>
>>> I don't think you can return element in the way shown in the snippet.
>>>
>>> On Mon, Jan 18, 2016 at 7:34 PM, charles li 
>>> wrote:
>>>
>>>> code snippet
>>>>
>>>>
>>>> ​
>>>> the 'print' actually print info on the worker node, but I feel confused
>>>> where the 'return' value
>>>> goes to. for I get nothing on the driver node.
>>>> --
>>>> *--*
>>>> a spark lover, a quant, a developer and a good man.
>>>>
>>>> http://github.com/litaotao
>>>>
>>>
>>>
>>
>>
>> --
>> *--*
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: rdd.foreach return value

2016-01-18 Thread charles li
hi, great thanks to david and ted, I know that the content of RDD can be
returned to driver using 'collect' method.

but my question is:


1. cause we can write any code we like in the function put into 'foreach',
so what happened when we actually write a 'return' sentence in the foreach
function?
2. as the photo shows bellow, the content of RDD doesn't change after
foreach function, why?
3. I feel a little confused about the 'foreach' method, it should be an
'action', right? cause it return nothing. or is there any best practice of
the 'foreach' funtion? or can some one put your code snippet when using
'foreach' method in your application, that would be awesome.


great thanks again



​

On Tue, Jan 19, 2016 at 11:44 AM, Ted Yu  wrote:

> Here is signature for foreach:
>  def foreach(f: T => Unit): Unit = withScope {
>
> I don't think you can return element in the way shown in the snippet.
>
> On Mon, Jan 18, 2016 at 7:34 PM, charles li 
> wrote:
>
>> code snippet
>>
>>
>> ​
>> the 'print' actually print info on the worker node, but I feel confused
>> where the 'return' value
>> goes to. for I get nothing on the driver node.
>> --
>> *--*
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: rdd.foreach return value

2016-01-18 Thread charles li
thanks, david and ted, I know that the content of RDD can be returned to
driver using `collect


​

On Tue, Jan 19, 2016 at 11:44 AM, Ted Yu  wrote:

> Here is signature for foreach:
>  def foreach(f: T => Unit): Unit = withScope {
>
> I don't think you can return element in the way shown in the snippet.
>
> On Mon, Jan 18, 2016 at 7:34 PM, charles li 
> wrote:
>
>> code snippet
>>
>>
>> ​
>> the 'print' actually print info on the worker node, but I feel confused
>> where the 'return' value
>> goes to. for I get nothing on the driver node.
>> --
>> *--*
>> a spark lover, a quant, a developer and a good man.
>>
>> http://github.com/litaotao
>>
>
>


-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


rdd.foreach return value

2016-01-18 Thread charles li
code snippet


​
the 'print' actually print info on the worker node, but I feel confused
where the 'return' value
goes to. for I get nothing on the driver node.
-- 
*--*
a spark lover, a quant, a developer and a good man.

http://github.com/litaotao


Re: [KafkaRDD]: rdd.cache() does not seem to work

2016-01-11 Thread charles li
cache is the default storage level of persist, and it is lazy [ not cached
indeed ] until the first time it is computed.

​

On Tue, Jan 12, 2016 at 5:13 AM, ponkin  wrote:

> Hi,
>
> Here is my use case :
> I have kafka topic. The job is fairly simple - it reads topic and save
> data to several hdfs paths.
> I create rdd with the following code
>  val r =
>  
> KafkaUtils.createRDD[Array[Byte],Array[Byte],DefaultDecoder,DefaultDecoder](context,kafkaParams,range)
>
> Then I am trying to cache that rdd with
>  r.cache()
> and then save this rdd to several hdfs locations.
> But it seems that KafkaRDD is fetching data from kafka broker every time I
> call saveAsNewAPIHadoopFile.
>
> How can I cache data from Kafka in memory?
>
> P.S. When I do repartition add it seems to work properly( read kafka only
> once) but spark store shuffled data localy.
> Is it possible to keep data in memory?
>
> --
> View this message in context: [KafkaRDD]: rdd.cache() does not seem to
> work
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Snappy error when driver is running in JBoss

2015-01-06 Thread Charles Li
Hi 
Thanks for the reply! I did do a echo $CLASSPATH, but I got nothing. Since 
we are running inside jboss, I guess the class path is not set?
People did mention that JBoss loads snappy-java multiple times. But I 
cannot find a way to solve that problem. 

Cheers  


On Jan 6, 2015, at 5:35 PM, Ted Yu  wrote:

> Might be due to conflict between multiple snappy jars.
> 
> Can you check the classpath to see if there are more than one snappy jar ?
> 
> Cheers
> 
> On Tue, Jan 6, 2015 at 2:26 PM, Charles  wrote:
> I get this exception(java.lang.UnsatisfiedLinkError) when the driver is
> running inside JBoss.
> 
> We are running with DataStax 4.6 version, which is using spark 1.1.0. The
> driver runs inside a wildfly container. The snappy-java version is 1.0.5.
> 
> 
> 2015-01-06 20:25:03,771 ERROR [akka.actor.ActorSystemImpl]
> (sparkDriver-akka.actor.default-dispatcher-22) Uncaught fatal error from
> thread [sparkDriver-akka.actor.default-dispatcher-3] shutting down
> ActorSystem [sparkDriver]: java.lang.UnsatisfiedLinkError:
> org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
> at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:320)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at org.xerial.snappy.SnappyOutputStream.(SnappyOutputStream.java:79)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:125)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:207)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:68)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at cenx.prometheus.spark.SparkContext.broadcast(Unknown Source)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:829)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:769)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:753)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1360)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [levski-0.5.0-SNAPSHOT-standalone.jar:]
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Snappy-error-when-driver-is-running-in-JBoss-tp21004.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 



Re: Questions about disk IOs

2014-07-25 Thread Charles Li
Hi Xiangrui,

I have 16 * 40 cpu cores in total. But I am only using 200 partitions on the 
200 executors. I use coalesce without shuffle to reduce the default partition 
of RDD.

The shuffle size from the WebUI is nearly 100m.

On Jul 25, 2014, at 23:51, Xiangrui Meng  wrote:

> How many partitions did you use and how many CPU cores in total? The
> former shouldn't be much larger than the latter. Could you also check
> the shuffle size from the WebUI? -Xiangrui
> 
> On Fri, Jul 25, 2014 at 4:10 AM, Charles Li  wrote:
>> Hi Xiangrui,
>> 
>> Thanks for your treeAggregate patch. It is very helpful.
>> After applying your patch in my local repos, the new spark can handle more 
>> partition than before.
>> But after some iteration(mapPartition + reduceByKey), the reducer seems 
>> become more slower and finally hang.
>> 
>> The logs shows there always 1 message pending in the outbox, and we are 
>> waiting for it. Are you aware this kind issue?
>> How can I know which message is pending?  Where is it supposed to go?
>> 
>> Log:
>> 
>> 14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 
>> is 752
>> 14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to 
>> driver
>> 14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
>> 14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from 
>> [*/**]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to 
>> [/]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Connected to 
>> [/], 1 messages pending
>> 14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for 
>> shuffle 0
>> 14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
>> task 742
>> 14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
>> 14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
>> 14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and 
>> clearing cache
>> 14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 
>> is 752
>> 14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to 
>> driver
>> 14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
>> <—— I have shutdown the App
>> 14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver 
>> commanded a shutdown
>> 
>> On Jul 2, 2014, at 0:08, Xiangrui Meng  wrote:
>> 
>>> Try to reduce number of partitions to match the number of cores. We
>>> will add treeAggregate to reduce the communication cost.
>>> 
>>> PR: https://github.com/apache/spark/pull/1110
>>> 
>>> -Xiangrui
>>> 
>>> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li  wrote:
>>>> Hi Spark,
>>>> 
>>>> I am running LBFGS on our user data. The data size with Kryo serialisation 
>>>> is about 210G. The weight size is around 1,300,000. I am quite confused 
>>>> that the performance is very close whether the data is cached or not.
>>>> 
>>>> The program is simple:
>>>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
>>>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not 
>>>> cached
>>>> gradient = new LogisticGrandient();
>>>> updater = new SquaredL2Updater();
>>>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
>>>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
>>>> convergeTol, maxIter, regParam, initWeight);
>>>> 
>>>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
>>>> cluster mode. Below are some arguments I am using:
>>>> —executor-memory 10G
>>>> —num-executors 50
>>>> —executor-cores 2
>>>> 
>>>> Storage Using:
>>>> When caching:
>>>> Cached Partitions 951
>>>> Fraction Cached 100%
>>>> Size in Memory 215.7GB
>>>> Size in Tachyon 0.0B
>>>> Size on Disk 1029.7MB
>>>> 
>>>> The time cost by every aggregate is around 5 minutes with cache enabled. 
>>>> Lots of disk IOs can be seen on the hadoop node. I have the same result 
>>>> with cache disabled.
>>>> 
>>>> Should data points caching improve the performance? Should caching 
>>>> decrease the disk IO?
>>>> 
>>>> Thanks in advance.
>> 



Re: Questions about disk IOs

2014-07-25 Thread Charles Li
Hi Xiangrui,

Thanks for your treeAggregate patch. It is very helpful.
After applying your patch in my local repos, the new spark can handle more 
partition than before.
But after some iteration(mapPartition + reduceByKey), the reducer seems become 
more slower and finally hang.

The logs shows there always 1 message pending in the outbox, and we are waiting 
for it. Are you aware this kind issue?
How can I know which message is pending?  Where is it supposed to go?

Log:

14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 is 
752
14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to 
driver
14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from 
[*/**]
14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to 
[/]
14/07/25 17:50:34 INFO network.SendingConnection: Connected to 
[/], 1 messages pending
14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for 
shuffle 0
14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
742
14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and 
clearing cache
14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 is 
752
14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to 
driver
14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
<—— I have shutdown the App
14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver commanded 
a shutdown

On Jul 2, 2014, at 0:08, Xiangrui Meng  wrote:

> Try to reduce number of partitions to match the number of cores. We
> will add treeAggregate to reduce the communication cost.
> 
> PR: https://github.com/apache/spark/pull/1110
> 
> -Xiangrui
> 
> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li  wrote:
>> Hi Spark,
>> 
>> I am running LBFGS on our user data. The data size with Kryo serialisation 
>> is about 210G. The weight size is around 1,300,000. I am quite confused that 
>> the performance is very close whether the data is cached or not.
>> 
>> The program is simple:
>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not 
>> cached
>> gradient = new LogisticGrandient();
>> updater = new SquaredL2Updater();
>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
>> convergeTol, maxIter, regParam, initWeight);
>> 
>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
>> cluster mode. Below are some arguments I am using:
>> —executor-memory 10G
>> —num-executors 50
>> —executor-cores 2
>> 
>> Storage Using:
>> When caching:
>> Cached Partitions 951
>> Fraction Cached 100%
>> Size in Memory 215.7GB
>> Size in Tachyon 0.0B
>> Size on Disk 1029.7MB
>> 
>> The time cost by every aggregate is around 5 minutes with cache enabled. 
>> Lots of disk IOs can be seen on the hadoop node. I have the same result with 
>> cache disabled.
>> 
>> Should data points caching improve the performance? Should caching decrease 
>> the disk IO?
>> 
>> Thanks in advance.



Questions about disk IOs

2014-07-01 Thread Charles Li
Hi Spark,

I am running LBFGS on our user data. The data size with Kryo serialisation is 
about 210G. The weight size is around 1,300,000. I am quite confused that the 
performance is very close whether the data is cached or not.

The program is simple:
points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not cached
gradient = new LogisticGrandient();
updater = new SquaredL2Updater();
initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
convergeTol, maxIter, regParam, initWeight);

I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its cluster 
mode. Below are some arguments I am using:
—executor-memory 10G
—num-executors 50
—executor-cores 2

Storage Using:
When caching:
Cached Partitions 951
Fraction Cached 100%
Size in Memory 215.7GB
Size in Tachyon 0.0B
Size on Disk 1029.7MB

The time cost by every aggregate is around 5 minutes with cache enabled. Lots 
of disk IOs can be seen on the hadoop node. I have the same result with cache 
disabled.

Should data points caching improve the performance? Should caching decrease the 
disk IO?

Thanks in advance.