Re: Interested in contributing to SPARK-24815

2023-07-25 Thread Kent Yao
Hi Pavan,

Refer to the ASF Source Header and Copyright Notice Policy[1], code
directly submitted to ASF should include the Apache license header
without any additional copyright notice.


Kent Yao

[1] https://www.apache.org/legal/src-headers.html#headers

Sean Owen  于2023年7月25日周二 07:22写道:

>
> When contributing to an ASF project, it's governed by the terms of the ASF 
> ICLA: https://www.apache.org/licenses/icla.pdf or CCLA: 
> https://www.apache.org/licenses/cla-corporate.pdf
>
> I don't believe ASF projects ever retain an original author copyright 
> statement, but rather source files have a statement like:
>
> ...
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
> ...
>
> While it's conceivable that such a statement could live in a NOTICE file, I 
> don't believe that's been done for any of the thousands of other 
> contributors. That's really more for noting the license of 
> non-Apache-licensed code. Code directly contributed to the project is assumed 
> to have been licensed per above already.
>
> It might be wise to review the CCLA with Twilio and consider establishing 
> that to govern contributions.
>
> On Mon, Jul 24, 2023 at 6:10 PM Pavan Kotikalapudi 
>  wrote:
>>
>> Hi Spark Dev,
>>
>> My name is Pavan Kotikalapudi, I work at Twilio.
>>
>> I am looking to contribute to this spark issue 
>> https://issues.apache.org/jira/browse/SPARK-24815.
>>
>> There is a clause from the company's OSS saying
>>
>> - The proposed contribution is about 100 lines of code modification in the 
>> Spark project, involving two files - this is considered a large 
>> contribution. An appropriate Twilio copyright notice needs to be added for 
>> the portion of code that is newly added.
>>
>> Please let me know if that is acceptable?
>>
>> Thank you,
>>
>> Pavan
>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Apache Kyuubi (Incubating) released 1.5.0-incubating

2022-03-25 Thread Kent Yao
Hi all,

The Apache Kyuubi (Incubating) community is pleased to announce that
Apache Kyuubi (Incubating) 1.5.0-incubating has been released!

Apache Kyuubi (Incubating) is a distributed multi-tenant JDBC server for
large-scale data processing and analytics, built on top of Apache Spark
and designed to support more engines like Apache Flink(Beta), Trino(Beta)
and so on.

Kyuubi provides a pure SQL gateway through Thrift JDBC/ODBC interface
for end-users to manipulate large-scale data with pre-programmed and
extensible Spark SQL engines.

We are aiming to make Kyuubi an "out-of-the-box" tool for data warehouses
and data lakes.

This "out-of-the-box" model minimizes the barriers and costs for end-users
to use Spark at the client-side.

At the server-side, Kyuubi server and engine's multi-tenant architecture
provides the administrators a way to achieve computing resource isolation,
data security, high availability, high client concurrency, etc.

The full release notes and download links are available at:
Release notes: https://kyuubi.apache.org/release/1.5.0-incubating.html
Download page: https://kyuubi.apache.org/releases.html

To learn more about Apache Kyuubi (Incubating), please see
https://kyuubi.apache.org/

Kyuubi Resources:
- Issue Tracker: https://kyuubi.apache.org/issue_tracking.html
- Mailing list: https://kyuubi.apache.org/mailing_lists.html

We would like to thank all contributors of the Kyuubi community and
Incubating
community who made this release possible!

Thanks,
On behalf of Apache Kyuubi (Incubating) community


Re: Spark version verification

2021-03-21 Thread Kent Yao






Hi Mich,> What are the correlations among these links and the ability to establish a spark build version   Check the documentation list here, http://spark.apache.org/documentation.html . And the `latest` always points to the list head, for example http://spark.apache.org/docs/latest/ means http://spark.apache.org/docs/3.1.1/ for nowThe Spark build version in Spark releases is create by `spark-build-info ` see https://github.com/apache/spark/blob/89bf2afb3337a44f34009a36cae16dd0ff86b353/build/spark-build-info#L32 Some other options to check the spark build info1. the `RELEASE` filecat RELEASESpark 3.0.1 (git revision 2b147c4cd5) built for Hadoop 2.7.4Build flags: -B -Pmesos -Pyarn -Pkubernetes -Psparkr -Pscala-2.12 -Phadoop-2.7 -Phive -Phive-thriftserver -DzincPort=30362. bin/spark-submit —versionThe git revision itself does not tell you whether the release is rc or final.If you have the Spark source code locally, you can use `git show 1d550c4e90275ab418b9161925049239227f3dc9` and get the tag info, like `commit 1d550c4e90275ab418b9161925049239227f3dc9 (tag: v3.1.1-rc3, tag: v3.1.1)`.Or you can compare the revision you have got with all tags here https://github.com/apache/spark/tags Bests,






  



















Kent Yao @ Data Science Center, Hangzhou Research Institute, NetEase Corp.a spark enthusiastkyuubiis a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of Apache Spark.spark-authorizerA Spark SQL extension which provides SQL Standard Authorization for Apache Spark.spark-postgres A library for reading data from and transferring data to Postgres / Greenplum with Spark SQL and DataFrames, 10~100x faster.spark-func-extrasA library that brings excellent and useful functions from various modern database management systems to Apache Spark.















 


On 03/22/2021 00:02,Mich Talebzadeh wrote: 


Hi Kent,Thanks for the links.You have to excuse my ignorance, what are the correlations among these links and the ability to establish a spark build version?

   view my Linkedin profile

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 21 Mar 2021 at 15:55, Kent Yao <yaooq...@qq.com> wrote:







Please refer to http://spark.apache.org/docs/latest/api/sql/index.html#version 






  


















    
Kent Yao @ Data Science Center, Hangzhou Research Institute, NetEase Corp.a spark enthusiastkyuubiis a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of Apache Spark.spark-authorizerA Spark SQL extension which provides SQL Standard Authorization for Apache Spark.spark-postgres A library for reading data from and transferring data to Postgres / Greenplum with Spark SQL and DataFrames, 10~100x faster.spark-func-extrasA library that brings excellent and useful functions from various modern database management systems to Apache Spark.















 


On 03/21/2021 23:28,Mich Talebzadeh wrote: 


Many thanksspark-sql> SELECT version();3.1.1 1d550c4e90275ab418b9161925049239227f3dc9What does 1d550c4e90275ab418b9161925049239227f3dc9 signify please?



   view my Linkedin profile

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 21 Mar 2021 at 15:14, Sean Owen <sro...@gmail.com> wrote:I believe you can "SELECT version()" in Spark SQL to see the build version.On Sun, Mar 21, 2021 at 4:41 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:Thanks for the detailed info.I was hoping that one can find a simpler answer to the Spark version than doing forensic examination on base code so to speak.The primer for this verification is that on GCP dataprocs originally built on 3.11-rc2, there was an issue with running Spark Structured Streaming (SSS) which I reported to this forum before.After a while and me reporting to Google, they have now upgraded the base to Spark 3.1.1 itself. I am not privy to how they did the upgrade itself.In the meantime we installed 3.1.1 on-premise and ran it with the same Python code for SSS. It worked fine.However, when I run the same code on GCP dataproc upgraded to 3.1.1, occasionally I see

Re: Spark version verification

2021-03-21 Thread Kent Yao






Please refer to http://spark.apache.org/docs/latest/api/sql/index.html#version 






  



















Kent Yao @ Data Science Center, Hangzhou Research Institute, NetEase Corp.a spark enthusiastkyuubiis a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of Apache Spark.spark-authorizerA Spark SQL extension which provides SQL Standard Authorization for Apache Spark.spark-postgres A library for reading data from and transferring data to Postgres / Greenplum with Spark SQL and DataFrames, 10~100x faster.spark-func-extrasA library that brings excellent and useful functions from various modern database management systems to Apache Spark.















 


On 03/21/2021 23:28,Mich Talebzadeh wrote: 


Many thanksspark-sql> SELECT version();3.1.1 1d550c4e90275ab418b9161925049239227f3dc9What does 1d550c4e90275ab418b9161925049239227f3dc9 signify please?



   view my Linkedin profile

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sun, 21 Mar 2021 at 15:14, Sean Owen <sro...@gmail.com> wrote:I believe you can "SELECT version()" in Spark SQL to see the build version.On Sun, Mar 21, 2021 at 4:41 AM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:Thanks for the detailed info.I was hoping that one can find a simpler answer to the Spark version than doing forensic examination on base code so to speak.The primer for this verification is that on GCP dataprocs originally built on 3.11-rc2, there was an issue with running Spark Structured Streaming (SSS) which I reported to this forum before.After a while and me reporting to Google, they have now upgraded the base to Spark 3.1.1 itself. I am not privy to how they did the upgrade itself.In the meantime we installed 3.1.1 on-premise and ran it with the same Python code for SSS. It worked fine.However, when I run the same code on GCP dataproc upgraded to 3.1.1, occasionally I see this error21/03/18 16:53:38 ERROR org.apache.spark.scheduler.AsyncEventQueue: Listener EventLoggingListener threw an exceptionjava.util.ConcurrentModificationException        at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)This may be for other reasons or the consequence of upgrading from 3.1.1-rc2 to 3.11?

   view my Linkedin profile

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  

On Sat, 20 Mar 2021 at 22:41, Attila Zsolt Piros <piros.attila.zs...@gmail.com> wrote:Hi!I would check out the Spark source then diff those two RCs (first just take look to the list of the changed files):$ git diff v3.1.1-rc1..v3.1.1-rc2 --stat...The shell scripts in the release can be checked very easily: $ git diff v3.1.1-rc1..v3.1.1-rc2 --stat | grep ".sh " bin/docker-image-tool.sh                           |   6 +- dev/create-release/release-build.sh                |   2 +-We are lucky as docker-image-tool.sh is part of the released version. Is it from v3.1.1-rc2 or v3.1.1-rc1?Of course this only works if docker-image-tool.sh is not changed from the v3.1.1-rc2 back to v3.1.1-rc1. So let's continue with the python (and latter with R) files:$ git diff v3.1.1-rc1..v3.1.1-rc2 --stat | grep ".py " python/pyspark/sql/avro/functions.py               |   4 +- python/pyspark/sql/dataframe.py                    |   1 + python/pyspark/sql/functions.py                    | 285 +-- .../pyspark/sql/tests/test_pandas_cogrouped_map.py |  12 + python/pyspark/sql/tests/test_pandas_map.py        |   8 +...After you have enough proof you can stop (to decide what is enough here should be decided by you). Finally you can use javap / scalap on the classes from the jars and check some code changes which is more harder to be analyzed than a simple text file.Best Regards,AttilaOn Thu, Mar 18, 2021 at 4:09 PM Mich Talebzadeh <mich.talebza...@gmail.com> wrote:Hi What would be a signature in Spark version or binaries that confirms the release is built on Spark built on 3.1.1 as opposed to 3.1.1-RC-1 or RC-2?Thanks

Mich

   view my Linkedin profile

 Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liabl

Re: [jira] [Commented] (SPARK-34648) Reading Parquet F =?utf-8?Q?iles_in_Spark_Extremely_Slow_for_Large_Number_of_Files??=

2021-03-10 Thread Kent Yao






Hi Pankaj,Have you tried spark.sql.parquet.respectSummaryFiles=true?




Bests,

  



















Kent Yao @ Data Science Center, Hangzhou Research Institute, NetEase Corp.a spark enthusiastkyuubiis a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of Apache Spark.spark-authorizerA Spark SQL extension which provides SQL Standard Authorization for Apache Spark.spark-postgres A library for reading data from and transferring data to Postgres / Greenplum with Spark SQL and DataFrames, 10~100x faster.spark-func-extrasA library that brings excellent and useful functions from various modern database management systems to Apache Spark.















 


On 03/10/2021 21:59,钟雨 wrote: 


Hi Pankaj,Can you show your detail code and Job/Stage Info? Which Stage is slow?Pankaj Bhootra <pankajbhoo...@gmail.com> 于2021年3月10日周三 下午12:32写道:Hi,Could someone please revert on this?ThanksPankaj BhootraOn Sun, 7 Mar 2021, 01:22 Pankaj Bhootra, <pankajbhoo...@gmail.com> wrote:Hello TeamI am new to Spark and this question may be a possible duplicate of the issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347 We have a large dataset partitioned by calendar date, and within each date partition, we are storing the data as parquet files in 128 parts.We are trying to run aggregation on this dataset for 366 dates at a time with Spark SQL on spark version 2.3.0, hence our Spark job is reading 366*128=46848 partitions, all of which are parquet files. There is currently no _metadata or _common_metadata file(s) available for this dataset.The problem we are facing is that when we try to run spark.read.parquet on the above 46848 partitions, our data reads are extremely slow. It takes a long time to run even a simple map task (no shuffling) without any aggregation or group by.I read through the above issue and I think I perhaps generally understand the ideas around _common_metadata file. But the above issue was raised for Spark 1.3.1 and for Spark 2.3.0, I have not found any documentation related to this metadata file so far.I would like to clarify:What's the latest, best practice for reading large number of parquet files efficiently?Does this involve using any additional options with spark.read.parquet? How would that work?Are there other possible reasons for slow data reads apart from reading metadata for every part? We are basically trying to migrate our existing spark pipeline from using csv files to parquet, but from my hands-on so far, it seems that parquet's read time is slower than csv? This seems contradictory to popular opinion that parquet performs better in terms of both computation and storage?Thanks Pankaj Bhootra-- Forwarded message -From: Takeshi Yamamuro (Jira) <j...@apache.org>Date: Sat, 6 Mar 2021, 20:02Subject: [jira] [Commented] (SPARK-34648) Reading Parquet Files in Spark Extremely Slow for Large Number of Files?To:  <pankajbhoo...@gmail.com>
    [ https://issues.apache.org/jira/browse/SPARK-34648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17296528#comment-17296528 ] 

Takeshi Yamamuro commented on SPARK-34648:
--

Please use the mailing list (user@spark.apache.org) instead. This is not a right place to ask questions.

> Reading Parquet Files in Spark Extremely Slow for Large Number of Files?
> 
>
>                 Key: SPARK-34648
>                 URL: https://issues.apache.org/jira/browse/SPARK-34648
>             Project: Spark
>          Issue Type: Question
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Pankaj Bhootra
>            Priority: Major
>
> Hello Team
> I am new to Spark and this question may be a possible duplicate of the issue highlighted here: https://issues.apache.org/jira/browse/SPARK-9347 
> We have a large dataset partitioned by calendar date, and within each date partition, we are storing the data as *parquet* files in 128 parts.
> We are trying to run aggregation on this dataset for 366 dates at a time with Spark SQL on spark version 2.3.0, hence our Spark job is reading 366*128=46848 partitions, all of which are parquet files. There is currently no *_metadata* or *_common_metadata* file(s) available for this dataset.
> The problem we are facing is that when we try to run *spark.read.parquet* on the above 46848 partitions, our data reads are extremely slow. It takes a long time to run even a simple map task (no shuffling) without any aggregation or group by.
> I read through the above issue and I think I perhaps generally understand the ideas around *_common_metadata* file. But the above issue was raised f

Re:spark 3.1.1 support hive 1.2

2021-03-09 Thread Kent Yao






Hi Li,Have you tried `Interacting with Different Versions of Hive Metastore` http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html#interacting-with-different-versions-of-hive-metastore 




Bests,

  



















Kent Yao @ Data Science Center, Hangzhou Research Institute, NetEase Corp.a spark enthusiastkyuubiis a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of Apache Spark.spark-authorizerA Spark SQL extension which provides SQL Standard Authorization for Apache Spark.spark-postgres A library for reading data from and transferring data to Postgres / Greenplum with Spark SQL and DataFrames, 10~100x faster.spark-func-extrasA library that brings excellent and useful functions from various modern database management systems to Apache Spark.















 


On 03/10/2021 10:56,jiahong li wrote: 


Hi,sorry to bother you.In spark 3.0.1,hive-1.2 is supported,but in spark 3.1.x maven profile hive-1.1 is removed.Is that means hive-1.2 does not supported  in spark 3.1.x? how can i support hive-1.2 in spark 3.1.x,or any jira? can anyone help me ?






Re: [ANNOUNCE] Announcing Apache Spark 3.1.1

2021-03-02 Thread Kent Yao







Congrats, all!







Bests,

  



















Kent Yao @ Data Science Center, Hangzhou Research Institute, NetEase Corp.a spark enthusiastkyuubiis a unified multi-tenant JDBC interface for large-scale data processing and analytics, built on top of Apache Spark.spark-authorizerA Spark SQL extension which provides SQL Standard Authorization for Apache Spark.spark-postgres A library for reading data from and transferring data to Postgres / Greenplum with Spark SQL and DataFrames, 10~100x faster.spark-func-extrasA library that brings excellent and useful functions from various modern database management systems to Apache Spark.















 


On 03/3/2021 15:11,Takeshi Yamamuro wrote: 


Great work and Congrats, all!Bests,TakeshiOn Wed, Mar 3, 2021 at 2:18 PM Mridul Muralidharan <mri...@gmail.com> wrote:Thanks Hyukjin and congratulations everyone on the release !Regards,Mridul On Tue, Mar 2, 2021 at 8:54 PM Yuming Wang <wgy...@gmail.com> wrote:Great work, Hyukjin!On Wed, Mar 3, 2021 at 9:50 AM Hyukjin Kwon <gurwls...@gmail.com> wrote:We are excited to announce Spark 3.1.1 today.Apache Spark 3.1.1 is the second release of the 3.x line. This release addsPython type annotations and Python dependency management support as part of Project Zen.Other major updates include improved ANSI SQL compliance support, history server supportin structured streaming, the general availability (GA) of Kubernetes and node decommissioningin Kubernetes and Standalone. In addition, this release continues to focus on usability, stability,and polish while resolving around 1500 tickets.We'd like to thank our contributors and users for their contributions and early feedback tothis release. This release would not have been possible without you.To download Spark 3.1.1, head over to the download page:http://spark.apache.org/downloads.htmlTo view the release notes:https://spark.apache.org/releases/spark-release-3-1-1.html


-- ---Takeshi Yamamuro






unsubscribe

2020-03-07 Thread haitao .yao
unsubscribe

-- 
haitao.yao


Re: I have trained a ML model, now what?

2019-01-23 Thread Pola Yao
Hi Riccardo,

Right now, Spark does not support low-latency predictions in Production.
MLeap is an alternative and it's been used in many scenarios. But it's good
to see that Spark Community has decided to provide such support.

On Wed, Jan 23, 2019 at 7:53 AM Riccardo Ferrari  wrote:

> Felix, thank you very much for the link. Much appreciated.
>
> The attached PDF is very interesting, I found myself evaluating many of
> the scenarios described in Q3. It's unfortunate the proposal is not being
> worked on, would be great to see that part of the code base.
>
> It is cool to see big players like Uber trying to make Open Source better,
> thanks!
>
>
> On Tue, Jan 22, 2019 at 5:24 PM Felix Cheung 
> wrote:
>
>> About deployment/serving
>>
>> SPIP
>> https://issues.apache.org/jira/browse/SPARK-26247
>>
>>
>> --
>> *From:* Riccardo Ferrari 
>> *Sent:* Tuesday, January 22, 2019 8:07 AM
>> *To:* User
>> *Subject:* I have trained a ML model, now what?
>>
>> Hi list!
>>
>> I am writing here to here about your experience on putting Spark ML
>> models into production at scale.
>>
>> I know it is a very broad topic with many different faces depending on
>> the use-case, requirements, user base and whatever is involved in the task.
>> Still I'd like to open a thread about this topic that is as important as
>> properly training a model and I feel is often neglected.
>>
>> The task is *serving web users with predictions* and the main challenge
>> I see is making it agile and swift.
>>
>> I think there are mainly 3 general categories of such deployment that can
>> be described as:
>>
>>- Offline/Batch: Load a model, performs the inference, store the
>>results in some datasotre (DB, indexes,...)
>>- Spark in the loop: Having a long running Spark context exposed in
>>some way, this include streaming as well as some custom application that
>>wraps the context.
>>- Use a different technology to load the Spark MLlib model and run
>>the inference pipeline. I have read about MLeap and other PMML based
>>solutions.
>>
>> I would love to hear about opensource solutions and possibly without
>> requiring cloud provider specific framework/component.
>>
>> Again I am aware each of the previous category have benefits and
>> drawback, so what would you pick? Why? and how?
>>
>> Thanks!
>>
>


Re: How to force-quit a Spark application?

2019-01-22 Thread Pola Yao
oy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
...
'''

What shall I do then? Thanks!


On Wed, Jan 16, 2019 at 1:15 PM Marcelo Vanzin  wrote:

> Those are daemon threads and not the cause of the problem. The main
> thread is waiting for the "org.apache.hadoop.util.ShutdownHookManager"
> thread, but I don't see that one in your list.
>
> On Wed, Jan 16, 2019 at 12:08 PM Pola Yao  wrote:
> >
> > Hi Marcelo,
> >
> > Thanks for your response.
> >
> > I have dumped the threads on the server where I submitted the spark
> application:
> >
> > '''
> > ...
> > "dispatcher-event-loop-2" #28 daemon prio=5 os_prio=0
> tid=0x7f56cee0e000 nid=0x1cb6 waiting on condition [0x7f5699811000]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x0006400161b8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> > at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > at
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > "dispatcher-event-loop-1" #27 daemon prio=5 os_prio=0
> tid=0x7f56cee0c800 nid=0x1cb5 waiting on condition [0x7f5699912000]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x0006400161b8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> > at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > at
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > "dispatcher-event-loop-0" #26 daemon prio=5 os_prio=0
> tid=0x7f56cee0c000 nid=0x1cb4 waiting on condition [0x7f569a12]
> >java.lang.Thread.State: WAITING (parking)
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for  <0x0006400161b8> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
> > at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
> > at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
> > at
> org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
> > at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> > at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> > at java.lang.Thread.run(Thread.java:745)
> >
> > "Service Thread" #20 daemon prio=9 os_prio=0 tid=0x7f56cc12d800
> nid=0x1ca5 runnable [0x]
> >java.lang.Thread.State: RUNNABLE
> >
> > "C1 CompilerThread14" #19 daemon prio=9 os_prio=0 tid=0x7f56cc12a000
> nid=0x1ca4 waiting on condition [0x]
> >java.lang.Thread.State: RUNNABLE
> > ...
> > "Finalizer" #3 daemon prio=8 os_prio=0 tid=0x7f56cc0ce000 nid=0x1c93
> in Object.wait() [0x7f56ab3f2000]
> >java.lang.Thread.State: WAITING (on object monitor)
> > at java.lang.Object.wait(Native Method)
> > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
> > - locked <0x0006400cd498> (a java.lang.ref.ReferenceQueue$Lock)
> > at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
> > at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
> >
> > "Reference Handler" #2 daemon prio=10 os_prio=0 tid=0x7f56cc0c9800
> nid=0x1c92 in 

Re: How to force-quit a Spark application?

2019-01-16 Thread Pola Yao
ge$.exit(package.scala:40)
at scala.sys.package$.exit(package.scala:33)
at
actionmodel.ParallelAdvertiserBeaconModel$.main(ParallelAdvertiserBeaconModel.scala:252)
at
actionmodel.ParallelAdvertiserBeaconModel.main(ParallelAdvertiserBeaconModel.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

"VM Thread" os_prio=0 tid=0x7f56cc0c1800 nid=0x1c91 runnable
...
'''

I have no clear idea what went wrong. I did call  awaitTermination to
terminate the thread pool. Or is there any way to force close all those
'WAITING' threads associated with my spark application?

On Wed, Jan 16, 2019 at 8:31 AM Marcelo Vanzin  wrote:

> If System.exit() doesn't work, you may have a bigger problem
> somewhere. Check your threads (using e.g. jstack) to see what's going
> on.
>
> On Wed, Jan 16, 2019 at 8:09 AM Pola Yao  wrote:
> >
> > Hi Marcelo,
> >
> > Thanks for your reply! It made sense to me. However, I've tried many
> ways to exit the spark (e.g., System.exit()), but failed. Is there an
> explicit way to shutdown all the alive threads in the spark application and
> then quit afterwards?
> >
> >
> > On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin 
> wrote:
> >>
> >> You should check the active threads in your app. Since your pool uses
> >> non-daemon threads, that will prevent the app from exiting.
> >>
> >> spark.stop() should have stopped the Spark jobs in other threads, at
> >> least. But if something is blocking one of those threads, or if
> >> something is creating a non-daemon thread that stays alive somewhere,
> >> you'll see that.
> >>
> >> Or you can force quit with sys.exit.
> >>
> >> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao  wrote:
> >> >
> >> > I submitted a Spark job through ./spark-submit command, the code was
> executed successfully, however, the application got stuck when trying to
> quit spark.
> >> >
> >> > My code snippet:
> >> > '''
> >> > {
> >> >
> >> > val spark = SparkSession.builder.master(...).getOrCreate
> >> >
> >> > val pool = Executors.newFixedThreadPool(3)
> >> > implicit val xc = ExecutionContext.fromExecutorService(pool)
> >> > val taskList = List(train1, train2, train3)  // where train* is a
> Future function which wrapped up some data reading and feature engineering
> and machine learning steps
> >> > val results = Await.result(Future.sequence(taskList), 20 minutes)
> >> >
> >> > println("Shutting down pool and executor service")
> >> > pool.shutdown()
> >> > xc.shutdown()
> >> >
> >> > println("Exiting spark")
> >> > spark.stop()
> >> >
> >> > }
> >> > '''
> >> >
> >> > After I submitted the job, from terminal, I could see the code was
> executed and printing "Exiting spark", however, after printing that line,
> it never existed spark, just got stuck.
> >> >
> >> > Does any body know what the reason is? Or how to force quitting?
> >> >
> >> > Thanks!
> >> >
> >> >
> >>
> >>
> >> --
> >> Marcelo
>
>
>
> --
> Marcelo
>


Re: How to force-quit a Spark application?

2019-01-16 Thread Pola Yao
Hi Marcelo,

Thanks for your reply! It made sense to me. However, I've tried many ways
to exit the spark (e.g., System.exit()), but failed. Is there an explicit
way to shutdown all the alive threads in the spark application and then
quit afterwards?


On Tue, Jan 15, 2019 at 2:38 PM Marcelo Vanzin  wrote:

> You should check the active threads in your app. Since your pool uses
> non-daemon threads, that will prevent the app from exiting.
>
> spark.stop() should have stopped the Spark jobs in other threads, at
> least. But if something is blocking one of those threads, or if
> something is creating a non-daemon thread that stays alive somewhere,
> you'll see that.
>
> Or you can force quit with sys.exit.
>
> On Tue, Jan 15, 2019 at 1:30 PM Pola Yao  wrote:
> >
> > I submitted a Spark job through ./spark-submit command, the code was
> executed successfully, however, the application got stuck when trying to
> quit spark.
> >
> > My code snippet:
> > '''
> > {
> >
> > val spark = SparkSession.builder.master(...).getOrCreate
> >
> > val pool = Executors.newFixedThreadPool(3)
> > implicit val xc = ExecutionContext.fromExecutorService(pool)
> > val taskList = List(train1, train2, train3)  // where train* is a Future
> function which wrapped up some data reading and feature engineering and
> machine learning steps
> > val results = Await.result(Future.sequence(taskList), 20 minutes)
> >
> > println("Shutting down pool and executor service")
> > pool.shutdown()
> > xc.shutdown()
> >
> > println("Exiting spark")
> > spark.stop()
> >
> > }
> > '''
> >
> > After I submitted the job, from terminal, I could see the code was
> executed and printing "Exiting spark", however, after printing that line,
> it never existed spark, just got stuck.
> >
> > Does any body know what the reason is? Or how to force quitting?
> >
> > Thanks!
> >
> >
>
>
> --
> Marcelo
>


How to force-quit a Spark application?

2019-01-15 Thread Pola Yao
I submitted a Spark job through ./spark-submit command, the code was
executed successfully, however, the application got stuck when trying to
quit spark.

My code snippet:
'''
{

val spark = SparkSession.builder.master(...).getOrCreate

val pool = Executors.newFixedThreadPool(3)
implicit val xc = ExecutionContext.fromExecutorService(pool)
val taskList = List(train1, train2, train3)  // where train* is a Future
function which wrapped up some data reading and feature engineering and
machine learning steps
val results = Await.result(Future.sequence(taskList), 20 minutes)

println("Shutting down pool and executor service")
pool.shutdown()
xc.shutdown()

println("Exiting spark")
spark.stop()

}
'''

After I submitted the job, from terminal, I could see the code was executed
and printing "Exiting spark", however, after printing that line, it never
existed spark, just got stuck.

Does any body know what the reason is? Or how to force quitting?

Thanks!


[Spark-ml]Error in training ML models: Missing an output location for shuffle xxx

2019-01-07 Thread Pola Yao
Hi Spark Comminuty,

I was using XGBoost-spark to train a machine learning model. The dataset
was not large (around 1G). And I used the following command to submit my
application:
'''

./bin/spark-submit --master yarn --deploy-mode client --num-executors 50
--executor-cores 2 --executor-memory 3g --driver-memory 8g --conf
spark.executor.memoryOverhead=2g --conf spark.network.timeout=2000s --class
XXX --jars /path/to/jars /path/to/application
'''

And got the following errors:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 58

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an
output location for shuffle 58
at 
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)
at 
org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at 
org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:863)
at 
org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:677)
at 
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:49)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:100)
at 
org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:99)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at 
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:30)
at 
ml.dmlc.xgboost4j.java.DataBatch$BatchIterator.hasNext(DataBatch.java:47)
at ml.dmlc.xgboost4j.java.XGBoostJNI.XGDMatrixCreateFromDataIter(Native 
Method)
at ml.dmlc.xgboost4j.java.DMatrix.(DMatrix.java:53)
at ml.dmlc.xgboost4j.scala.DMatrix.(DMatrix.scala:42)
at 
ml.dmlc.xgboost4j.scala.spark.Watches$.buildWatches(XGBoost.scala:436)
at 
ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$12.apply(XGBoost.scala:276)
at 
ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anonfun$12.apply(XGBoost.scala:275)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

The error was occurred at foreachPartition at XGBoost.scala:287


Did anybody know what caused the error? Was it a memory issue?

Thanks!


[spark-ml] How to write a Spark Application correctly?

2019-01-02 Thread Pola Yao
Hello Spark Community,

I have a dataset of size 20G, 20 columns. Each column is categorical, so I
applied string-indexer and one-hot-encoding on every column. After, I
applied vector-assembler on all the newly derived columns to form a feature
vector for each record, and then feed the feature vectors to a ML algorithm.

However, during the feature engineering steps, I observed from Spark UI
that the input size (i.e., from Executor tab) increased dramatically to
600G+. The cluster I used might not have so much resource. Are there any
ways for optimizing the memory usage of intermediate results?

Thanks.


Fwd: Train multiple machine learning models in parallel

2018-12-19 Thread Pola Yao
Hi Comminuty,

I have a 1T dataset which contains records for  50 users. Each user has 20G
data averagely.

I wanted to use spark to train a machine learning model (e.g., XGBoost tree
model) for each user. Ideally, the result should be 50 models. However,
it'd be infeasible to submit 50 spark jobs through 'spark-submit'.

The model parameters and feature engineering steps for each user's data
would be exactly same, I am wondering if there is a way to train this 50
models in parallel?

Thanks!


Re: Cosine Similarity between documents - Rows

2017-11-27 Thread Ge, Yao (Y.)
You are essential doing document clustering. K-means will do it. You do have to 
specify the number of clusters up front.

Sent from Email+ secured by MobileIron




From: "Donni Khan" 
>
Date: Monday, November 27, 2017 at 7:27:33 AM
To: "user@spark.apache.org" 
>
Subject: Cosine Similarity between documents - Rows


I have spark job to compute the similarity between text documents:

RowMatrix rowMatrix = new RowMatrix(vectorsRDD.rdd());
CoordinateMatrix  rowsimilarity=rowMatrix.columnSimilarities(0.5);
JavaRDD entries = rowsimilarity.entries().toJavaRDD();

List list = entries.collect();

for(MatrixEntry s : list) System.out.println(s);

the MatrixEntry(i, j, value) represents the similarity between columns(let's 
say the features of documents).
But how can I show the similarity between rows?
suppose I have five documents Doc1, Doc5, We would like to show the 
similarity between all those documnts.
 How do I get that? any help?

Thank you
Donni


Re: Getting exit code of pipe()

2017-02-12 Thread Xuchen Yao
Cool that's exactly what I was looking for! Thanks!

How does one output the status into stdout? I mean, how does one capture
the status output of pipe() command?

On Sat, Feb 11, 2017 at 9:50 AM, Felix Cheung <felixcheun...@hotmail.com>
wrote:

> Do you want the job to fail if there is an error exit code?
>
> You could set checkCode to True
> spark.apache.org/docs/latest/api/python/pyspark.html?
> highlight=pipe#pyspark.RDD.pipe
>
> Otherwise maybe you want to output the status into stdout so you could
> process it individually.
>
>
> _____
> From: Xuchen Yao <yaoxuc...@gmail.com>
> Sent: Friday, February 10, 2017 11:18 AM
> Subject: Getting exit code of pipe()
> To: <user@spark.apache.org>
>
>
>
> Hello Community,
>
> I have the following Python code that calls an external command:
>
> rdd.pipe('run.sh', env=os.environ).collect()
>
> run.sh can either exit with status 1 or 0, how could I get the exit code
> from Python? Thanks!
>
> Xuchen
>
>
>


Getting exit code of pipe()

2017-02-10 Thread Xuchen Yao
Hello Community,

I have the following Python code that calls an external command:

rdd.pipe('run.sh', env=os.environ).collect()

run.sh can either exit with status 1 or 0, how could I get the exit code
from Python? Thanks!

Xuchen


Re: Word2Vec distributed?

2015-12-20 Thread Yao
I have the similar observation with 1.4.1 where the 3rd stage running
mapPartitionsWithIndex at Word2Vec.scala:312 seems running with a single
thread  (which takes forever for reasonable large corpus). Can anyone help
explain if this is an algorithm limitation or there model parameters can be
effect the outcome?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Word2Vec-distributed-tp23758p25747.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark scala REPL - Unable to create sqlContext

2015-10-25 Thread Yao
I have not been able to start Spark scala shell since 1.5 as it was not able
to create the sqlContext during the startup. It complains the metastore_db
is already locked: "Another instance of Derby may have already booted the
database". The Derby log is attached.

I only have this problem with starting the shell in yarn-client mode. I am
working with HDP2.2.6 which runs Hadoop 2.6.

-Yao derby.log
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n25195/derby.log>  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-scala-REPL-Unable-to-create-sqlContext-tp25195.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark scala REPL - Unable to create sqlContext

2015-10-25 Thread Ge, Yao (Y.)
Thanks. I wonder why this is not widely reported in the user forum. The RELP 
shell is basically broken in 1.5 .0 and 1.5.1
-Yao

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Sunday, October 25, 2015 12:01 PM
To: Ge, Yao (Y.)
Cc: user
Subject: Re: Spark scala REPL - Unable to create sqlContext

Have you taken a look at the fix for SPARK-11000 which is in the upcoming 1.6.0 
release ?

Cheers

On Sun, Oct 25, 2015 at 8:42 AM, Yao <y...@ford.com<mailto:y...@ford.com>> 
wrote:
I have not been able to start Spark scala shell since 1.5 as it was not able
to create the sqlContext during the startup. It complains the metastore_db
is already locked: "Another instance of Derby may have already booted the
database". The Derby log is attached.

I only have this problem with starting the shell in yarn-client mode. I am
working with HDP2.2.6 which runs Hadoop 2.6.

-Yao derby.log
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n25195/derby.log>



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-scala-REPL-Unable-to-create-sqlContext-tp25195.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>



[SPARK-9776]Another instance of Derby may have already booted the database #8947

2015-10-23 Thread Ge, Yao (Y.)
I have not been able to run spark-shell in yarn-cluster mode since 1.5.0 due to 
the same issue described by [SPARK-9776]. Did this pull request fix the issue?
https://github.com/apache/spark/pull/8947
I still have the same problem with 1.5.1 (I am running on HDP 2.2.6 with Hadoop 
2.6)
Thanks.

-Yao



Re: Possible long lineage issue when using DStream to update a normal RDD

2015-05-08 Thread Chunnan Yao
Thank you for this suggestion! But may I ask what's the advantage to use
checkpoint instead of cache here? Cuz they both cut lineage. I only know
checkpoint saves RDD in disk, while cache in memory. So may be it's for
reliability?

Also on http://spark.apache.org/docs/latest/streaming-programming-guide.html,
I have not seen usage of foreachRDD like mine. Here I am not pushing data
to external system. I just use it to update an RDD in Spark. Is this right?



2015-05-08 14:03 GMT+08:00 Shao, Saisai saisai.s...@intel.com:

 I think you could use checkpoint to cut the lineage of `MyRDD`, I have a
 similar scenario and I use checkpoint to workaround this problem :)

 Thanks
 Jerry

 -Original Message-
 From: yaochunnan [mailto:yaochun...@gmail.com]
 Sent: Friday, May 8, 2015 1:57 PM
 To: user@spark.apache.org
 Subject: Possible long lineage issue when using DStream to update a normal
 RDD

 Hi all,
 Recently in our project, we need to update a RDD using data regularly
 received from DStream, I plan to use foreachRDD API to achieve this:
 var MyRDD = ...
 dstream.foreachRDD { rdd =
   MyRDD = MyRDD.join(rdd)...
   ...
 }

 Is this usage correct? My concern is, as I am repeatedly and endlessly
 reassigning MyRDD in order to update it, will it create a too long RDD
 lineage to process when I want to query MyRDD later on (similar as
 https://issues.apache.org/jira/browse/SPARK-4672) ?

 Maybe I should:
 1. cache or checkpoint latest MyRDD and unpersist old MyRDD every time a
 dstream comes in.
 2. use the unpublished IndexedRDD
 (https://github.com/amplab/spark-indexedrdd) to conduct efficient RDD
 update.

 As I lack experience using Spark Streaming and indexedRDD, I am here to
 make sure my thoughts are on the right track. Your wise suggestions will be
 greatly appreciated.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Possible-long-lineage-issue-when-using-DStream-to-update-a-normal-RDD-tp22812.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
 commands, e-mail: user-h...@spark.apache.org




Re: Using TF-IDF from MLlib

2014-12-28 Thread Yao
I found the TF-IDF feature extraction and all the MLlib code that work with
pure Vector RDD very difficult to work with due to the lack of ability to
associate vector back to the original data. Why can't Spark MLlib support
LabeledPoint? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-TF-IDF-from-MLlib-tp19429p20876.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: TF-IDF in Spark 1.1.0

2014-12-28 Thread Yao
Can you show how to do IDF transform on tfWithId? Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/TF-IDF-in-Spark-1-1-0-tp16389p20877.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Decision Tree with libsvmtools datasets

2014-12-10 Thread Ge, Yao (Y.)
I am testing decision tree using iris.scale data set 
(http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/multiclass.html#iris)
In the data set there are three class labels 1, 2, and 3. However in the 
following code, I have to make numClasses = 4. I will get an 
ArrayIndexOutOfBound Exception if I make the numClasses = 3. Why?

var conf = new SparkConf().setAppName(DecisionTree)
var sc = new SparkContext(conf)

val data = MLUtils.loadLibSVMFile(sc,data/iris.scale.txt);
val numClasses = 4;
val categoricalFeaturesInfo = Map[Int,Int]();
val impurity = gini;
val maxDepth = 5;
val maxBins = 100;

val model = DecisionTree.trainClassifier(data, numClasses, 
categoricalFeaturesInfo, impurity, maxDepth, maxBins);

val labelAndPreds = data.map{ point =
  val prediction = model.predict(point.features);
  (point.label, prediction)
}

val trainErr = labelAndPreds.filter(r = r._1 != r._2).count.toDouble / 
data.count;
println(Training Error =  + trainErr);
println(Learned classification tree model:\n + model);

-Yao


Decision Tree with Categorical Features

2014-12-10 Thread Ge, Yao (Y.)
Can anyone provide an example code of using Categorical Features in Decision 
Tree?
Thanks!

-Yao


add support for separate GC log files for different executor

2014-11-05 Thread haitao .yao
Hey, guys. Here's my problem:
While using the standalone mode, I always use the following args for
executor:

-XX:+PrintGCDetails -XX:+PrintGCDateStamps -verbose:gc
-Xloggc:/tmp/spark.executor.gc.log

​
But as we know, hotspot JVM does not support variable substitution on
-Xloggc parameter, which will cause gc log be overwritten by other later
executors.

May I create a new path, which will add variable substitution before worker
forks a new executor to avoid GC log overwriteen?

First thoughts: configure the executor jvm args like this:

-XX:+PrintGCDetails -XX:+PrintGCDateStamps -verbose:gc
-Xloggc:/tmp/spark.executor.%applicationId%.gc.log

​
and this will replace the %applicationId% with the current application ID
and pass the final args into java command line

We can support more variables such as executorId

Thanks.
-- 
haitao.yao


spark_ec2.py for AWS region: cn-north-1, China

2014-11-04 Thread haitao .yao
Hi,
   Amazon aws started to provide service for China mainland, the region
name is cn-north-1. But the script spark provides: spark_ec2.py will query
ami id from https://github.com/mesos/spark-ec2/tree/v4/ami-list and there's
no ami information for cn-north-1 region .
   Can anybody update the ami information and update the reo:
https://github.com/mesos/spark-ec2.git ?

   Thanks.

-- 
haitao.yao


Re: spark_ec2.py for AWS region: cn-north-1, China

2014-11-04 Thread haitao .yao
I'm afraid not. We have been using EC2 instances in cn-north-1 region for a
while. And the latest version of boto has added the region: cn-north-1
Here's the  screenshot:
 from  boto import ec2
 ec2.regions()
[RegionInfo:us-east-1, RegionInfo:cn-north-1, RegionInfo:ap-northeast-1,
RegionInfo:eu-west-1, RegionInfo:ap-southeast-1, RegionInfo:ap-southeast-2,
RegionInfo:us-west-2, RegionInfo:us-gov-west-1, RegionInfo:us-west-1,
RegionInfo:eu-central-1, RegionInfo:sa-east-1]


I do think the doc is out of dated.



2014-11-05 9:45 GMT+08:00 Nicholas Chammas nicholas.cham...@gmail.com:


 http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html

 cn-north-1 is not a supported region for EC2, as far as I can tell. There
 may be other AWS services that can use that region, but spark-ec2 relies on
 EC2.

 Nick

 On Tue, Nov 4, 2014 at 8:09 PM, haitao .yao yao.e...@gmail.com wrote:

 Hi,
Amazon aws started to provide service for China mainland, the region
 name is cn-north-1. But the script spark provides: spark_ec2.py will query
 ami id from https://github.com/mesos/spark-ec2/tree/v4/ami-list and
 there's no ami information for cn-north-1 region .
Can anybody update the ami information and update the reo:
 https://github.com/mesos/spark-ec2.git ?

Thanks.

 --
 haitao.yao







-- 
haitao.yao


Re: spark_ec2.py for AWS region: cn-north-1, China

2014-11-04 Thread haitao .yao
Done, JIRA link: https://issues.apache.org/jira/browse/SPARK-4241

Thanks.

2014-11-05 10:58 GMT+08:00 Nicholas Chammas nicholas.cham...@gmail.com:

 Oh, I can see that region via boto as well. Perhaps the doc is indeed out
 of date.

 Do you mind opening a JIRA issue
 https://issues.apache.org/jira/secure/Dashboard.jspa to track this
 request? I can do it if you've never opened a JIRA issue before.

 Nick

 On Tue, Nov 4, 2014 at 9:03 PM, haitao .yao yao.e...@gmail.com wrote:

 I'm afraid not. We have been using EC2 instances in cn-north-1 region for
 a while. And the latest version of boto has added the region: cn-north-1
 Here's the  screenshot:
  from  boto import ec2
  ec2.regions()
 [RegionInfo:us-east-1, RegionInfo:cn-north-1, RegionInfo:ap-northeast-1,
 RegionInfo:eu-west-1, RegionInfo:ap-southeast-1, RegionInfo:ap-southeast-2,
 RegionInfo:us-west-2, RegionInfo:us-gov-west-1, RegionInfo:us-west-1,
 RegionInfo:eu-central-1, RegionInfo:sa-east-1]
 

 I do think the doc is out of dated.



 2014-11-05 9:45 GMT+08:00 Nicholas Chammas nicholas.cham...@gmail.com:


 http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html

 cn-north-1 is not a supported region for EC2, as far as I can tell.
 There may be other AWS services that can use that region, but spark-ec2
 relies on EC2.

 Nick

 On Tue, Nov 4, 2014 at 8:09 PM, haitao .yao yao.e...@gmail.com wrote:

 Hi,
Amazon aws started to provide service for China mainland, the region
 name is cn-north-1. But the script spark provides: spark_ec2.py will query
 ami id from https://github.com/mesos/spark-ec2/tree/v4/ami-list and
 there's no ami information for cn-north-1 region .
Can anybody update the ami information and update the reo:
 https://github.com/mesos/spark-ec2.git ?

Thanks.

 --
 haitao.yao







 --
 haitao.yao







-- 
haitao.yao


scala.MatchError: class java.sql.Timestamp

2014-10-19 Thread Ge, Yao (Y.)
I am working with Spark 1.1.0 and I believe Timestamp is a supported data type 
for Spark SQL. However I keep getting this MatchError for java.sql.Timestamp 
when I try to use reflection to register a Java Bean with Timestamp field. 
Anything wrong with my code below?

public static class Event implements Serializable {
private String name;
private Timestamp time;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Timestamp getTime() {
return time;
}
public void setTime(Timestamp time) {
this.time = time;
}
}

@Test
public void testTimeStamp() {
JavaSparkContext sc = new 
JavaSparkContext(local, timestamp);
String[] data = {1,2014-01-01, 
2,2014-02-01};
JavaRDDString input = 
sc.parallelize(Arrays.asList(data));
JavaRDDEvent events = input.map(new 
FunctionString,Event() {
public Event call(String arg0) 
throws Exception {
String[] c = 
arg0.split(,);
Event e = new 
Event();
e.setName(c[0]);
DateFormat fmt 
= new SimpleDateFormat(-MM-dd);
e.setTime(new 
Timestamp(fmt.parse(c[1]).getTime()));
return e;
}
});

JavaSQLContext sqlCtx = new JavaSQLContext(sc);
JavaSchemaRDD schemaEvent = 
sqlCtx.applySchema(events, Event.class);
schemaEvent.registerTempTable(event);

sc.stop();
}


RE: scala.MatchError: class java.sql.Timestamp

2014-10-19 Thread Ge, Yao (Y.)
scala.MatchError: class java.sql.Timestamp (of class java.lang.Class)
at 
org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:189)
at 
org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:188)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at 
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at 
org.apache.spark.sql.api.java.JavaSQLContext.getSchema(JavaSQLContext.scala:188)
at 
org.apache.spark.sql.api.java.JavaSQLContext.applySchema(JavaSQLContext.scala:90)
at 
com.ford.dtc.ff.SessionStats.testTimeStamp(SessionStats.java:111)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown 
Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at 
org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at 
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at 
org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at 
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
at 
org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
at 
org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
at 
org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)


From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com]
Sent: Sunday, October 19, 2014 10:31 AM
To: Ge, Yao (Y.); user@spark.apache.org
Subject: RE: scala.MatchError: class java.sql.Timestamp

Can you provide the exception stack?

Thanks,
Daoyuan

From: Ge, Yao (Y.) [mailto:y...@ford.com]
Sent: Sunday, October 19, 2014 10:17 PM
To: user@spark.apache.orgmailto:user@spark.apache.org
Subject: scala.MatchError: class java.sql.Timestamp

I am working with Spark 1.1.0 and I believe Timestamp is a supported data type 
for Spark SQL. However I keep getting this MatchError for java.sql.Timestamp 
when I try to use reflection to register a Java Bean with Timestamp field. 
Anything wrong with my code below?

public static class Event implements Serializable {
private String name;
private Timestamp time;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Timestamp getTime() {
return time;
}
public void setTime(Timestamp time) {
this.time = time;
}
}

@Test
public void testTimeStamp() {
JavaSparkContext sc = new

Exception Logging

2014-10-16 Thread Ge, Yao (Y.)
I need help to better trap Exception in the map functions. What is the best way 
to catch the exception and provide some helpful diagnostic information such as 
source of the input such as file name (and ideally line number if I am 
processing a text file)?

-Yao


RE: Dedup

2014-10-09 Thread Ge, Yao (Y.)
Yes. I was using String array as arguments in the reduceByKey. I think String 
array is actually immutable and simply returning the first argument without 
cloning one should work. I will look into mapPartitions as we can have up to 
40% duplicates. Will follow up on this if necessary. Thanks very much Sean!

-Yao  

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: Thursday, October 09, 2014 3:04 AM
To: Ge, Yao (Y.)
Cc: user@spark.apache.org
Subject: Re: Dedup

I think the question is about copying the argument. If it's an immutable value 
like String, yes just return the first argument and ignore the second. If 
you're dealing with a notoriously mutable value like a Hadoop Writable, you 
need to copy the value you return.

This works fine although you will spend a fair bit of time marshaling all of 
those duplicates together just to discard all but one.

If there are lots of duplicates, It would take a bit more work, but would be 
faster, to do something like this: mapPartitions and retain one input value 
each unique dedup criteria, and then output those pairs, and then reduceByKey 
the result.

On Wed, Oct 8, 2014 at 8:37 PM, Ge, Yao (Y.) y...@ford.com wrote:
 I need to do deduplication processing in Spark. The current plan is to 
 generate a tuple where key is the dedup criteria and value is the 
 original input. I am thinking to use reduceByKey to discard duplicate 
 values. If I do that, can I simply return the first argument or should 
 I return a copy of the first argument. Is there are better way to do dedup in 
 Spark?



 -Yao


Dedup

2014-10-08 Thread Ge, Yao (Y.)
I need to do deduplication processing in Spark. The current plan is to generate 
a tuple where key is the dedup criteria and value is the original input. I am 
thinking to use reduceByKey to discard duplicate values. If I do that, can I 
simply return the first argument or should I return a copy of the first 
argument. Is there are better way to do dedup in Spark?

-Yao


RE: KMeans - java.lang.IllegalArgumentException: requirement failed

2014-08-12 Thread Ge, Yao (Y.)
I figured it out. My indices parameters for the sparse vector are messed up. It 
is a good learning for me:
When use the Vectors.sparse(int size, int[] indices, double[] values) to 
generate a vector, size is the size of the whole vector, not just the size of 
the elements with value. The indices array will need to be in ascending order. 
In many cases, it probably easier to use other two forms of Vectors.sparse 
functions if the indices and value positions are not naturally sorted.

-Yao


From: Ge, Yao (Y.)
Sent: Monday, August 11, 2014 11:44 PM
To: 'u...@spark.incubator.apache.org'
Subject: KMeans - java.lang.IllegalArgumentException: requirement failed

I am trying to train a KMeans model with sparse vector with Spark 1.0.1.
When I run the training I got the following exception:
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at 
org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)
at 
org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)
at 
org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)

What does this means? How do I troubleshoot this problem?
Thanks.

-Yao


KMeans - java.lang.IllegalArgumentException: requirement failed

2014-08-11 Thread Ge, Yao (Y.)
I am trying to train a KMeans model with sparse vector with Spark 1.0.1.
When I run the training I got the following exception:
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at 
org.apache.spark.mllib.util.MLUtils$.fastSquaredDistance(MLUtils.scala:271)
at 
org.apache.spark.mllib.clustering.KMeans$.fastSquaredDistance(KMeans.scala:398)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:372)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$findClosest$1.apply(KMeans.scala:366)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at 
org.apache.spark.mllib.clustering.KMeans$.findClosest(KMeans.scala:366)
at 
org.apache.spark.mllib.clustering.KMeans$.pointCost(KMeans.scala:389)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:269)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17$$anonfun$apply$7.apply(KMeans.scala:268)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Range.foreach(Range.scala:141)
at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at 
scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:268)
at 
org.apache.spark.mllib.clustering.KMeans$$anonfun$17.apply(KMeans.scala:267)

What does this means? How do I troubleshoot this problem?
Thanks.

-Yao


Re: How can I implement eigenvalue decomposition in Spark?

2014-08-08 Thread Chunnan Yao
I think the eigenvalues and eigenvectors you are talking about is that of
M^T*M or M*M^T, if we get M=U*s*V^T as SVD. What I want is to get
eigenvectors and eigenvalues of M itself. Is this my misunderstanding of
linear algebra or the API?

[image: M^{*} M = V \Sigma^{*} U^{*}\, U \Sigma V^{*} = V (\Sigma^{*}
\Sigma) V^{*}\,][image: M M^{*} = U \Sigma V^{*} \, V \Sigma^{*} U^{*} = U
(\Sigma \Sigma^{*}) U^{*}\,]



2014-08-08 11:19 GMT+08:00 x wasedax...@gmail.com:

 U.rows.toArray.take(1).foreach(println) and 
 V.toArray.take(s.size).foreach(println)
 are not eigenvectors corresponding to the biggest eigenvalue
 s.toArray(0)*s.toArray(0)?

 xj @ Tokyo


 On Fri, Aug 8, 2014 at 12:07 PM, Chunnan Yao yaochun...@gmail.com wrote:

 Hi there, what you've suggested are all meaningful. But to make myself
 clearer, my essential problems are:
 1. My matrix is asymmetric, and it is a probabilistic adjacency matrix,
 whose entries(a_ij) represents the likelihood that user j will broadcast
 the information generated by user i. Apparently, a_ij and a_ji is
 different, caus I love you doesn't necessarily mean you love me(What a sad
 story~). All entries are real.
 2. I know I can get eigenvalues through SVD. My problem is I can't get
 the corresponding eigenvectors, which requires solving equations, and I
 also need eigenvectors in my calculation.In my simulation of this paper, I
 only need the biggest eigenvalues and corresponding eigenvectors.
 The paper posted by Shivaram Venkataraman is also concerned about
 symmetric matrix. Could any one help me out?


 2014-08-08 9:41 GMT+08:00 x wasedax...@gmail.com:

  The SVD computed result already contains descending order of singular
 values, you can get the biggest eigenvalue.

 ---

   val svd = matrix.computeSVD(matrix.numCols().toInt, computeU = true)
   val U: RowMatrix = svd.U
   val s: Vector = svd.s
   val V: Matrix = svd.V

   U.rows.toArray.take(1).foreach(println)

   println(s.toArray(0)*s.toArray(0))

   println(V.toArray.take(s.size).foreach(println))

 ---

 xj @ Tokyo


 On Fri, Aug 8, 2014 at 3:06 AM, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edu wrote:

 If you just want to find the top eigenvalue / eigenvector you can do
 something like the Lanczos method. There is a description of a MapReduce
 based algorithm in Section 4.2 of [1]

 [1] http://www.cs.cmu.edu/~ukang/papers/HeigenPAKDD2011.pdf


 On Thu, Aug 7, 2014 at 10:54 AM, Li Pu l...@twitter.com.invalid wrote:

 @Miles, the latest SVD implementation in mllib is partially
 distributed. Matrix-vector multiplication is computed among all workers,
 but the right singular vectors are all stored in the driver. If your
 symmetric matrix is n x n and you want the first k eigenvalues, you will
 need to fit n x k doubles in driver's memory. Behind the scene, it calls
 ARPACK to compute eigen-decomposition of A^T A. You can look into the
 source code for the details.

 @Sean, the SVD++ implementation in graphx is not the canonical
 definition of SVD. It doesn't have the orthogonality that SVD holds. But 
 we
 might want to use graphx as the underlying matrix representation for
 mllib.SVD to address the problem of skewed entry distribution.


 On Thu, Aug 7, 2014 at 10:51 AM, Evan R. Sparks evan.spa...@gmail.com
  wrote:

 Reza Zadeh has contributed the distributed implementation of
 (Tall/Skinny) SVD (
 http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html),
 which is in MLlib (Spark 1.0) and a distributed sparse SVD coming in 
 Spark
 1.1. (https://issues.apache.org/jira/browse/SPARK-1782). If your
 data is sparse (which it often is in social networks), you may have 
 better
 luck with this.

 I haven't tried the GraphX implementation, but those algorithms are
 often well-suited for power-law distributed graphs as you might see in
 social networks.

 FWIW, I believe you need to square elements of the sigma matrix from
 the SVD to get the eigenvalues.




 On Thu, Aug 7, 2014 at 10:20 AM, Sean Owen so...@cloudera.com
 wrote:

 (-incubator, +user)

 If your matrix is symmetric (and real I presume), and if my linear
 algebra isn't too rusty, then its SVD is its eigendecomposition. The
 SingularValueDecomposition object you get back has U and V, both of
 which have columns that are the eigenvectors.

 There are a few SVDs in the Spark code. The one in mllib is not
 distributed (right?) and is probably not an efficient means of
 computing eigenvectors if you really just want a decomposition of a
 symmetric matrix.

 The one I see in graphx is distributed? I haven't used it though.
 Maybe it could be part of a solution.



 On Thu, Aug 7, 2014 at 2:21 PM, yaochunnan yaochun...@gmail.com
 wrote:
  Our lab need to do some simulation on online social networks. We
 need to
  handle a 5000*5000 adjacency matrix, namely, to get its largest
 eigenvalue
  and corresponding eigenvector. Matlab can be used but it is
 time-consuming.
  Is Spark effective in linear algebra calculations

Re: How can I implement eigenvalue decomposition in Spark?

2014-08-07 Thread Chunnan Yao
Hi there, what you've suggested are all meaningful. But to make myself
clearer, my essential problems are:
1. My matrix is asymmetric, and it is a probabilistic adjacency matrix,
whose entries(a_ij) represents the likelihood that user j will broadcast
the information generated by user i. Apparently, a_ij and a_ji is
different, caus I love you doesn't necessarily mean you love me(What a sad
story~). All entries are real.
2. I know I can get eigenvalues through SVD. My problem is I can't get the
corresponding eigenvectors, which requires solving equations, and I also
need eigenvectors in my calculation.In my simulation of this paper, I only
need the biggest eigenvalues and corresponding eigenvectors.
The paper posted by Shivaram Venkataraman is also concerned about symmetric
matrix. Could any one help me out?


2014-08-08 9:41 GMT+08:00 x wasedax...@gmail.com:

  The SVD computed result already contains descending order of singular
 values, you can get the biggest eigenvalue.

 ---

   val svd = matrix.computeSVD(matrix.numCols().toInt, computeU = true)
   val U: RowMatrix = svd.U
   val s: Vector = svd.s
   val V: Matrix = svd.V

   U.rows.toArray.take(1).foreach(println)

   println(s.toArray(0)*s.toArray(0))

   println(V.toArray.take(s.size).foreach(println))

 ---

 xj @ Tokyo


 On Fri, Aug 8, 2014 at 3:06 AM, Shivaram Venkataraman 
 shiva...@eecs.berkeley.edu wrote:

 If you just want to find the top eigenvalue / eigenvector you can do
 something like the Lanczos method. There is a description of a MapReduce
 based algorithm in Section 4.2 of [1]

 [1] http://www.cs.cmu.edu/~ukang/papers/HeigenPAKDD2011.pdf


 On Thu, Aug 7, 2014 at 10:54 AM, Li Pu l...@twitter.com.invalid wrote:

 @Miles, the latest SVD implementation in mllib is partially distributed.
 Matrix-vector multiplication is computed among all workers, but the right
 singular vectors are all stored in the driver. If your symmetric matrix is
 n x n and you want the first k eigenvalues, you will need to fit n x k
 doubles in driver's memory. Behind the scene, it calls ARPACK to compute
 eigen-decomposition of A^T A. You can look into the source code for the
 details.

 @Sean, the SVD++ implementation in graphx is not the canonical
 definition of SVD. It doesn't have the orthogonality that SVD holds. But we
 might want to use graphx as the underlying matrix representation for
 mllib.SVD to address the problem of skewed entry distribution.


 On Thu, Aug 7, 2014 at 10:51 AM, Evan R. Sparks evan.spa...@gmail.com
 wrote:

 Reza Zadeh has contributed the distributed implementation of
 (Tall/Skinny) SVD (
 http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html),
 which is in MLlib (Spark 1.0) and a distributed sparse SVD coming in Spark
 1.1. (https://issues.apache.org/jira/browse/SPARK-1782). If your data
 is sparse (which it often is in social networks), you may have better luck
 with this.

 I haven't tried the GraphX implementation, but those algorithms are
 often well-suited for power-law distributed graphs as you might see in
 social networks.

 FWIW, I believe you need to square elements of the sigma matrix from
 the SVD to get the eigenvalues.




 On Thu, Aug 7, 2014 at 10:20 AM, Sean Owen so...@cloudera.com wrote:

 (-incubator, +user)

 If your matrix is symmetric (and real I presume), and if my linear
 algebra isn't too rusty, then its SVD is its eigendecomposition. The
 SingularValueDecomposition object you get back has U and V, both of
 which have columns that are the eigenvectors.

 There are a few SVDs in the Spark code. The one in mllib is not
 distributed (right?) and is probably not an efficient means of
 computing eigenvectors if you really just want a decomposition of a
 symmetric matrix.

 The one I see in graphx is distributed? I haven't used it though.
 Maybe it could be part of a solution.



 On Thu, Aug 7, 2014 at 2:21 PM, yaochunnan yaochun...@gmail.com
 wrote:
  Our lab need to do some simulation on online social networks. We
 need to
  handle a 5000*5000 adjacency matrix, namely, to get its largest
 eigenvalue
  and corresponding eigenvector. Matlab can be used but it is
 time-consuming.
  Is Spark effective in linear algebra calculations and
 transformations? Later
  we would have 500*500 matrix processed. It seems emergent
 that we
  should find some distributed computation platform.
 
  I see SVD has been implemented and I can get eigenvalues of a matrix
 through
  this API.  But when I want to get both eigenvalues and eigenvectors
 or at
  least the biggest eigenvalue and the corresponding eigenvector, it
 seems
  that current Spark doesn't have such API. Is it possible that I write
  eigenvalue decomposition from scratch? What should I do? Thanks a
 lot!
 
 
  Miles Yao
 
  
  View this message in context: How can I implement eigenvalue
 decomposition
  in Spark?
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com

Driver OOM while using reduceByKey

2014-05-29 Thread haitao .yao
Hi,

 I used 1g memory for the driver java process and got OOM error on
driver side before reduceByKey. After analyzed the heap dump, the biggest
object is org.apache.spark.MapStatus, which occupied over 900MB memory.

Here's my question:


1. Is there any optimization switches that I can tune to avoid this? I have
used the compression on output with spark.io.compression.codec.

2. Why the workers send all the data back to driver to run reduceByKey?
With the current implementation, if I use reduceByKey on TBs of data, that
will be a disaster for driver. Maybe I'm wrong about the assumption of the
spark implementation.


And here's my code snippet:


```

val cntNew = spark.accumulator(0)

val cntOld = spark.accumulator(0)

val cntErr = spark.accumulator(0)


val sequenceFileUrl = args(0)

val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)

val stat = seq.map(pair = convertData(

  pair._2, cntNew, cntOld, cntErr

)).reduceByKey(_ + _)

stat.saveAsSequenceFile(args(1)

```


Thanks.


-- 

haitao.yao@China


Re: Driver OOM while using reduceByKey

2014-05-29 Thread haitao .yao
Thanks. it worked.


2014-05-30 1:53 GMT+08:00 Matei Zaharia matei.zaha...@gmail.com:

 That hash map is just a list of where each task ran, it’s not the actual
 data. How many map and reduce tasks do you have? Maybe you need to give the
 driver a bit more memory, or use fewer tasks (e.g. do reduceByKey(_ + _,
 100) to use only 100 tasks).

 Matei

 On May 29, 2014, at 2:03 AM, haitao .yao yao.e...@gmail.com wrote:

  Hi,
 
   I used 1g memory for the driver java process and got OOM error on
 driver side before reduceByKey. After analyzed the heap dump, the biggest
 object is org.apache.spark.MapStatus, which occupied over 900MB memory.
 
  Here's my question:
 
 
  1. Is there any optimization switches that I can tune to avoid this? I
 have used the compression on output with spark.io.compression.codec.
 
  2. Why the workers send all the data back to driver to run reduceByKey?
 With the current implementation, if I use reduceByKey on TBs of data, that
 will be a disaster for driver. Maybe I'm wrong about the assumption of the
 spark implementation.
 
 
  And here's my code snippet:
 
 
  ```
 
  val cntNew = spark.accumulator(0)
 
  val cntOld = spark.accumulator(0)
 
  val cntErr = spark.accumulator(0)
 
 
  val sequenceFileUrl = args(0)
 
  val seq = spark.sequenceFile[Text, BytesWritable](sequenceFileUrl)
 
  val stat = seq.map(pair = convertData(
 
pair._2, cntNew, cntOld, cntErr
 
  )).reduceByKey(_ + _)
 
  stat.saveAsSequenceFile(args(1)
 
  ```
 
 
  Thanks.
 
 
  --
 
  haitao.yao@China




-- 
haitao.yao@Beijing