Re: [ANNOUNCE] Apache Spark 3.1.2 released

2021-06-01 Thread Takeshi Yamamuro
Thank you, Dongjoon!

On Wed, Jun 2, 2021 at 2:29 PM Xiao Li  wrote:

> Thank you!
>
> Xiao
>
> On Tue, Jun 1, 2021 at 9:29 PM Hyukjin Kwon  wrote:
>
>> awesome!
>>
>> 2021년 6월 2일 (수) 오전 9:59, Dongjoon Hyun 님이 작성:
>>
>>> We are happy to announce the availability of Spark 3.1.2!
>>>
>>> Spark 3.1.2 is a maintenance release containing stability fixes. This
>>> release is based on the branch-3.1 maintenance branch of Spark. We
>>> strongly
>>> recommend all 3.1 users to upgrade to this stable release.
>>>
>>> To download Spark 3.1.2, head over to the download page:
>>> https://spark.apache.org/downloads.html
>>>
>>> To view the release notes:
>>> https://spark.apache.org/releases/spark-release-3-1-2.html
>>>
>>> We would like to acknowledge all community members for contributing to
>>> this
>>> release. This release would not have been possible without you.
>>>
>>> Dongjoon Hyun
>>>
>>
>
> --
>
>

-- 
---
Takeshi Yamamuro


Re: [ANNOUNCE] Announcing Apache Spark 3.1.1

2021-03-02 Thread Takeshi Yamamuro
Congrats, all!

Bests,
Takeshi

On Wed, Mar 3, 2021 at 2:18 PM Mridul Muralidharan  wrote:

>
> Thanks Hyukjin and congratulations everyone on the release !
>
> Regards,
> Mridul
>
> On Tue, Mar 2, 2021 at 8:54 PM Yuming Wang  wrote:
>
>> Great work, Hyukjin!
>>
>> On Wed, Mar 3, 2021 at 9:50 AM Hyukjin Kwon  wrote:
>>
>>> We are excited to announce Spark 3.1.1 today.
>>>
>>> Apache Spark 3.1.1 is the second release of the 3.x line. This release
>>> adds
>>> Python type annotations and Python dependency management support as part
>>> of Project Zen.
>>> Other major updates include improved ANSI SQL compliance support,
>>> history server support
>>> in structured streaming, the general availability (GA) of Kubernetes and
>>> node decommissioning
>>> in Kubernetes and Standalone. In addition, this release continues to
>>> focus on usability, stability,
>>> and polish while resolving around 1500 tickets.
>>>
>>> We'd like to thank our contributors and users for their contributions
>>> and early feedback to
>>> this release. This release would not have been possible without you.
>>>
>>> To download Spark 3.1.1, head over to the download page:
>>> http://spark.apache.org/downloads.html
>>>
>>> To view the release notes:
>>> https://spark.apache.org/releases/spark-release-3-1-1.html
>>>
>>>

-- 
---
Takeshi Yamamuro


Re: Spark SQL Dataset and BigDecimal

2021-02-17 Thread Takeshi Yamamuro
Yea, I think that's because it's needed for interoperability between
scala/java.
If it returns a scala decimal, java code cannot handle it.

If you want a scala decimal, you need to convert it by yourself.

Bests,
Takeshi

On Wed, Feb 17, 2021 at 9:48 PM Ivan Petrov  wrote:

> Hi, I'm using Spark Scala Dataset API to write spark sql jobs.
> I've noticed that Spark dataset accepts scala BigDecimal as the value but
> it always returns java.math.BigDecimal when you read it back.
>
> Is it by design?
> Should I use java.math.BigDecimal everywhere instead?
> Is there any performance penalty for  using scala BigDecimal? it's more
> convenient from an API point of view than java.math.BigDecimal.
>


-- 
---
Takeshi Yamamuro


Re: Custom JdbcConnectionProvider

2020-10-27 Thread Takeshi Yamamuro
> the user and developer guide will come soon...

Yea, it looks nice! Thanks for the work.

On Tue, Oct 27, 2020 at 11:21 PM Gabor Somogyi 
wrote:

> Thanks Takeshi for sharing it, that can be used as an example.
> The user and developer guide will come soon...
>
> On Tue, Oct 27, 2020 at 2:31 PM Takeshi Yamamuro 
> wrote:
>
>> Hi,
>>
>> Please see an example code in
>> https://github.com/gaborgsomogyi/spark-jdbc-connection-provider (
>> https://github.com/apache/spark/pull/29024).
>> Since it depends on the service loader, I think you need to add a
>> configuration file in META-INF/services.
>>
>> Bests,
>> Takeshi
>>
>> On Tue, Oct 27, 2020 at 9:50 PM rafaelkyrdan 
>> wrote:
>>
>>> Guys do you know how I can use the custom implementation of
>>> JdbcConnectionProvider?
>>>
>>> As far as I understand in the spark jdbc we can use custom Driver, like
>>> this:
>>> *val jdbcDF = spark.read
>>>   .format("jdbc")
>>>   .option("url", "jdbc:postgresql:dbserver").option("driver",
>>> "my.drivier")
>>> *
>>> And we need a matching JdbcConnectionProvider which will override the
>>> property:
>>> * override val driverClass = "my.driver"*
>>>
>>> I have both but I see that they are not used. Do I need to register
>>> somehow
>>> them? Could someone share a relevant example?
>>> Thx.
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>

-- 
---
Takeshi Yamamuro


Re: Custom JdbcConnectionProvider

2020-10-27 Thread Takeshi Yamamuro
Hi,

Please see an example code in
https://github.com/gaborgsomogyi/spark-jdbc-connection-provider (
https://github.com/apache/spark/pull/29024).
Since it depends on the service loader, I think you need to add a
configuration file in META-INF/services.

Bests,
Takeshi

On Tue, Oct 27, 2020 at 9:50 PM rafaelkyrdan  wrote:

> Guys do you know how I can use the custom implementation of
> JdbcConnectionProvider?
>
> As far as I understand in the spark jdbc we can use custom Driver, like
> this:
> *val jdbcDF = spark.read
>   .format("jdbc")
>   .option("url", "jdbc:postgresql:dbserver").option("driver", "my.drivier")
> *
> And we need a matching JdbcConnectionProvider which will override the
> property:
> * override val driverClass = "my.driver"*
>
> I have both but I see that they are not used. Do I need to register somehow
> them? Could someone share a relevant example?
> Thx.
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> ---------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
---
Takeshi Yamamuro


Re: [ANNOUNCE] Announcing Apache Spark 3.0.1

2020-09-11 Thread Takeshi Yamamuro
Congrats and thanks, Ruifeng!


On Fri, Sep 11, 2020 at 9:50 PM Dongjoon Hyun 
wrote:

> It's great. Thank you, Ruifeng!
>
> Bests,
> Dongjoon.
>
> On Fri, Sep 11, 2020 at 1:54 AM 郑瑞峰  wrote:
>
>> Hi all,
>>
>> We are happy to announce the availability of Spark 3.0.1!
>> Spark 3.0.1 is a maintenance release containing stability fixes. This
>> release is based on the branch-3.0 maintenance branch of Spark. We strongly
>> recommend all 3.0 users to upgrade to this stable release.
>>
>> To download Spark 3.0.1, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> Note that you might need to clear your browser cache or to use
>> `Private`/`Incognito` mode according to your browsers.
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-3-0-1.html
>>
>> We would like to acknowledge all community members for contributing to
>> this release. This release would not have been possible without you.
>>
>>
>> Thanks,
>> Ruifeng Zheng
>>
>>

-- 
---
Takeshi Yamamuro


Re: [ANNOUNCE] Apache Spark 3.0.0

2020-06-18 Thread Takeshi Yamamuro
Congrats, all!

Bests,
Takeshi

On Fri, Jun 19, 2020 at 1:16 PM Felix Cheung 
wrote:

> Congrats
>
> --
> *From:* Jungtaek Lim 
> *Sent:* Thursday, June 18, 2020 8:18:54 PM
> *To:* Hyukjin Kwon 
> *Cc:* Mridul Muralidharan ; Reynold Xin <
> r...@databricks.com>; dev ; user <
> user@spark.apache.org>
> *Subject:* Re: [ANNOUNCE] Apache Spark 3.0.0
>
> Great, thanks all for your efforts on the huge step forward!
>
> On Fri, Jun 19, 2020 at 12:13 PM Hyukjin Kwon  wrote:
>
> Yay!
>
> 2020년 6월 19일 (금) 오전 4:46, Mridul Muralidharan 님이 작성:
>
> Great job everyone ! Congratulations :-)
>
> Regards,
> Mridul
>
> On Thu, Jun 18, 2020 at 10:21 AM Reynold Xin  wrote:
>
> Hi all,
>
> Apache Spark 3.0.0 is the first release of the 3.x line. It builds on many
> of the innovations from Spark 2.x, bringing new ideas as well as continuing
> long-term projects that have been in development. This release resolves
> more than 3400 tickets.
>
> We'd like to thank our contributors and users for their contributions and
> early feedback to this release. This release would not have been possible
> without you.
>
> To download Spark 3.0.0, head over to the download page:
> http://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-0-0.html
>
>
>
>

-- 
---
Takeshi Yamamuro


Re: [ANNOUNCE] Apache Spark 2.4.6 released

2020-06-10 Thread Takeshi Yamamuro
Congrats and thanks, Holden!

Bests,
Takeshi

On Thu, Jun 11, 2020 at 11:16 AM Dongjoon Hyun 
wrote:

> Thank you so much, Holden! :)
>
> On Wed, Jun 10, 2020 at 6:59 PM Hyukjin Kwon  wrote:
>
>> Yay!
>>
>> 2020년 6월 11일 (목) 오전 10:38, Holden Karau 님이 작성:
>>
>>> We are happy to announce the availability of Spark 2.4.6!
>>>
>>> Spark 2.4.6 is a maintenance release containing stability, correctness,
>>> and security fixes.
>>> This release is based on the branch-2.4 maintenance branch of Spark. We
>>> strongly recommend all 2.4 users to upgrade to this stable release.
>>>
>>> To download Spark 2.4.6, head over to the download page:
>>> http://spark.apache.org/downloads.html
>>> Spark 2.4.6 is also available in Maven Central, PyPI, and CRAN.
>>>
>>> Note that you might need to clear your browser cache or
>>> to use `Private`/`Incognito` mode according to your browsers.
>>>
>>> To view the release notes:
>>> https://spark.apache.org/releases/spark-release-2.4.6.html
>>>
>>> We would like to acknowledge all community members for contributing to
>>> this
>>> release. This release would not have been possible without you.
>>>
>>

-- 
---
Takeshi Yamamuro


Re: [ANNOUNCE] Announcing Apache Spark 2.4.5

2020-02-08 Thread Takeshi Yamamuro
Happy to hear the release news!

Bests,
Takeshi

On Sun, Feb 9, 2020 at 10:28 AM Dongjoon Hyun 
wrote:

> There was a typo in one URL. The correct release note URL is here.
>
> https://spark.apache.org/releases/spark-release-2-4-5.html
>
>
>
> On Sat, Feb 8, 2020 at 5:22 PM Dongjoon Hyun 
> wrote:
>
>> We are happy to announce the availability of Spark 2.4.5!
>>
>> Spark 2.4.5 is a maintenance release containing stability fixes. This
>> release is based on the branch-2.4 maintenance branch of Spark. We
>> strongly
>> recommend all 2.4 users to upgrade to this stable release.
>>
>> To download Spark 2.4.5, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> Note that you might need to clear your browser cache or
>> to use `Private`/`Incognito` mode according to your browsers.
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-2.4.5.html
>>
>> We would like to acknowledge all community members for contributing to
>> this
>> release. This release would not have been possible without you.
>>
>> Dongjoon Hyun
>>
>

-- 
---
Takeshi Yamamuro


Re: [ANNOUNCE] Announcing Apache Spark 3.0.0-preview2

2019-12-24 Thread Takeshi Yamamuro
Great work, Yuming!

Bests,
Takeshi

On Wed, Dec 25, 2019 at 6:00 AM Xiao Li  wrote:

> Thank you all. Happy Holidays!
>
> Xiao
>
> On Tue, Dec 24, 2019 at 12:53 PM Yuming Wang  wrote:
>
>> Hi all,
>>
>> To enable wide-scale community testing of the upcoming Spark 3.0 release,
>> the Apache Spark community has posted a new preview release of Spark 3.0.
>> This preview is *not a stable release in terms of either API or
>> functionality*, but it is meant to give the community early access to
>> try the code that will become Spark 3.0. If you would like to test the
>> release, please download it, and send feedback using either the mailing
>> lists <https://spark.apache.org/community.html> or JIRA
>> <https://issues.apache.org/jira/projects/SPARK?selectedItem=com.atlassian.jira.jira-projects-plugin%3Asummary-page>
>> .
>>
>> There are a lot of exciting new features added to Spark 3.0, including
>> Dynamic Partition Pruning, Adaptive Query Execution, Accelerator-aware
>> Scheduling, Data Source API with Catalog Supports, Vectorization in SparkR,
>> support of Hadoop 3/JDK 11/Scala 2.12, and many more. For a full list of
>> major features and changes in Spark 3.0.0-preview2, please check the thread(
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-3-0-preview-release-feature-list-and-major-changes-td28050.html
>>  and
>> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-3-0-preview-release-2-td28491.html
>> ).
>>
>> We'd like to thank our contributors and users for their contributions and
>> early feedback to this release. This release would not have been possible
>> without you.
>>
>> To download Spark 3.0.0-preview2, head over to the download page:
>> https://archive.apache.org/dist/spark/spark-3.0.0-preview2
>>
>> Happy Holidays.
>>
>> Yuming
>>
>
>
> --
> [image: Databricks Summit - Watch the talks]
> <https://databricks.com/sparkaisummit/north-america>
>


-- 
---
Takeshi Yamamuro


Re: Release Apache Spark 2.4.4

2019-08-13 Thread Takeshi Yamamuro
Hi,

Thanks for your notification, Dongjoon!
I put some links for the other committers/PMCs to access the info easily:

A commit list in github from the last release:
https://github.com/apache/spark/compare/5ac2014e6c118fbeb1fe8e5c8064c4a8ee9d182a...branch-2.4
A issue list in jira:
https://issues.apache.org/jira/projects/SPARK/versions/12345466#release-report-tab-body
The 5 correctness issues resolved in branch-2.4:
https://issues.apache.org/jira/browse/SPARK-27798?jql=project%20%3D%2012315420%20AND%20fixVersion%20%3D%2012345466%20AND%20labels%20in%20(%27correctness%27)%20ORDER%20BY%20priority%20DESC%2C%20key%20ASC

Anyway, +1

Best,
Takeshi

On Wed, Aug 14, 2019 at 8:25 AM DB Tsai  wrote:

> +1
>
> On Tue, Aug 13, 2019 at 4:16 PM Dongjoon Hyun 
> wrote:
> >
> > Hi, All.
> >
> > Spark 2.4.3 was released three months ago (8th May).
> > As of today (13th August), there are 112 commits (75 JIRAs) in
> `branch-24` since 2.4.3.
> >
> > It would be great if we can have Spark 2.4.4.
> > Shall we start `2.4.4 RC1` next Monday (19th August)?
> >
> > Last time, there was a request for K8s issue and now I'm waiting for
> SPARK-27900.
> > Please let me know if there is another issue.
> >
> > Thanks,
> > Dongjoon.
>
> -----
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>

-- 
---
Takeshi Yamamuro


[ANNOUNCE] Announcing Apache Spark 2.3.3

2019-02-17 Thread Takeshi Yamamuro
We are happy to announce the availability of Spark 2.3.3!

Apache Spark 2.3.3 is a maintenance release, based on the branch-2.3
maintenance branch of Spark. We strongly recommend all 2.3.x users to
upgrade to this stable release.

To download Spark 2.3.3, head over to the download page:
http://spark.apache.org/downloads.html

To view the release notes:
https://spark.apache.org/releases/spark-release-2-3-3.html

We would like to acknowledge all community members for contributing to
this release. This release would not have been possible without you.

Best,
Takeshi

-- 
---
Takeshi Yamamuro


Re: [ANNOUNCE] Announcing Apache Spark 2.2.3

2019-01-16 Thread Takeshi Yamamuro
Thanks, Dongjoon!


On Wed, Jan 16, 2019 at 5:23 PM Hyukjin Kwon  wrote:

> Nice!
>
> 2019년 1월 16일 (수) 오전 11:55, Jiaan Geng 님이 작성:
>
>> Glad to hear this.
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>

-- 
---
Takeshi Yamamuro


Re: Spark jdbc postgres numeric array

2019-01-04 Thread Takeshi Yamamuro
Hi,

I filed a jira: https://issues.apache.org/jira/browse/SPARK-26540

On Thu, Jan 3, 2019 at 10:04 PM Takeshi Yamamuro 
wrote:

> Hi,
>
> I checked that v2.2/v2.3/v2.4/master had the same issue, so can you file a
> jira?
> I looked over the related code and then I think we need more logics to
> handle this issue;
>
> https://github.com/apache/spark/blob/2a30deb85ae4e42c5cbc936383dd5c3970f4a74f/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L41
>
>
> On Tue, Jan 1, 2019 at 12:13 AM Alexey  wrote:
>
>> Hi,
>>
>> I came across strange behavior when dealing with postgres columns of type
>> numeric[] using Spark 2.3.2, PostgreSQL 10.4, 9.6.9.
>> Consider the following table definition:
>>
>> create table test1
>> (
>>v  numeric[],
>>d  numeric
>> );
>>
>> insert into test1 values('{.222,.332}', 222.4555);
>>
>> When reading the table into a Dataframe, I get the following schema:
>>
>> root
>>  |-- v: array (nullable = true)
>>  ||-- element: decimal(0,0) (containsNull = true)
>>  |-- d: decimal(38,18) (nullable = true)
>>
>> Notice that for both columns precision and scale were not specified, but
>> in case of the array element I got both set to 0, while in the other case
>> defaults were set.
>>
>> Later, when I try to read the Dataframe, I get the following error:
>>
>> java.lang.IllegalArgumentException: requirement failed: Decimal precision
>> 4 exceeds max precision 0
>> at scala.Predef$.require(Predef.scala:224)
>> at org.apache.spark.sql.types.Decimal.set(Decimal.scala:114)
>> at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:453)
>> at
>> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$16$$anonfun$apply$6$$anonfun$apply$7.apply(JdbcUtils.scala:474)
>> ...
>>
>> I would expect to get array elements of type decimal(38,18) and no error
>> when reading in this case.
>> Should this be considered a bug? Is there a workaround other than
>> changing the column array type definition to include explicit precision and
>> scale?
>>
>> Best regards,
>> Alexey
>>
>> -- реклама -------
>> Поторопись зарегистрировать самый короткий почтовый адрес @i.ua
>> https://mail.i.ua/reg - и получи 1Gb для хранения писем
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> ---
> Takeshi Yamamuro
>


-- 
---
Takeshi Yamamuro


Re: Spark jdbc postgres numeric array

2019-01-03 Thread Takeshi Yamamuro
Hi,

I checked that v2.2/v2.3/v2.4/master had the same issue, so can you file a
jira?
I looked over the related code and then I think we need more logics to
handle this issue;
https://github.com/apache/spark/blob/2a30deb85ae4e42c5cbc936383dd5c3970f4a74f/sql/core/src/main/scala/org/apache/spark/sql/jdbc/PostgresDialect.scala#L41


On Tue, Jan 1, 2019 at 12:13 AM Alexey  wrote:

> Hi,
>
> I came across strange behavior when dealing with postgres columns of type
> numeric[] using Spark 2.3.2, PostgreSQL 10.4, 9.6.9.
> Consider the following table definition:
>
> create table test1
> (
>v  numeric[],
>d  numeric
> );
>
> insert into test1 values('{.222,.332}', 222.4555);
>
> When reading the table into a Dataframe, I get the following schema:
>
> root
>  |-- v: array (nullable = true)
>  ||-- element: decimal(0,0) (containsNull = true)
>  |-- d: decimal(38,18) (nullable = true)
>
> Notice that for both columns precision and scale were not specified, but
> in case of the array element I got both set to 0, while in the other case
> defaults were set.
>
> Later, when I try to read the Dataframe, I get the following error:
>
> java.lang.IllegalArgumentException: requirement failed: Decimal precision
> 4 exceeds max precision 0
> at scala.Predef$.require(Predef.scala:224)
> at org.apache.spark.sql.types.Decimal.set(Decimal.scala:114)
> at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:453)
> at
> org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$16$$anonfun$apply$6$$anonfun$apply$7.apply(JdbcUtils.scala:474)
> ...
>
> I would expect to get array elements of type decimal(38,18) and no error
> when reading in this case.
> Should this be considered a bug? Is there a workaround other than changing
> the column array type definition to include explicit precision and scale?
>
> Best regards,
> Alexey
>
> -- реклама ---
> Поторопись зарегистрировать самый короткий почтовый адрес @i.ua
> https://mail.i.ua/reg - и получи 1Gb для хранения писем
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
---
Takeshi Yamamuro


Re: Spark 2.3.1 not working on Java 10

2018-06-21 Thread Takeshi Yamamuro
In this ticket SPARK-24201, the ambiguous statement in the doc had been
pointed out.
can you make pr for that?

On Fri, Jun 22, 2018 at 6:17 AM, vaquar khan  wrote:

> https://spark.apache.org/docs/2.3.0/
>
> Avoid confusion we need to updated doc with supported java version "*Java8
> + " *word confusing for users
>
> Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. For the Scala API,
> Spark 2.3.0 uses Scala 2.11. You will need to use a compatible Scala
> version (2.11.x).
>
>
> Regards,
> Vaquar khan
>
> On Thu, Jun 21, 2018 at 11:56 AM, chriswakare  intellibridge.co> wrote:
>
>> Hi Rahul,
>> This will work only in Java 8.
>> Installation does not work with both version 9 and 10
>>
>> Thanks,
>> Christopher
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Regards,
> Vaquar Khan
> +1 -224-436-0783
> Greater Chicago
>



-- 
---
Takeshi Yamamuro


Re: How to use disk instead of just InMemoryRelation when use JDBC datasource in SPARKSQL?

2018-04-12 Thread Takeshi Yamamuro
You want to use `Dataset.persist(StorageLevel.MEMORY_AND_DISK)`?

On Thu, Apr 12, 2018 at 1:12 PM, Louis Hust <louis.h...@gmail.com> wrote:

> We  want to extract data from mysql, and calculate in sparksql.
> The sql explain like below.
>
>
> REGIONKEY#177,N_COMMENT#178] PushedFilters: [], ReadSchema:
> struct<N_NATIONKEY:int,N_NAME:string,N_REGIONKEY:int,N_COMMENT:string>
>   +- *(20) Sort [r_regionkey#203 ASC NULLS FIRST], false, 0
>  +- Exchange(coordinator id: 266374831)
> hashpartitioning(r_regionkey#203, 200), coordinator[target post-shuffle
> partition size: 67108864]
> +- *(19) Project [R_REGIONKEY#203]
>+- *(19) Filter ((isnotnull(r_name#204) &&
> (r_name#204 = AFRICA)) && isnotnull(r_regionkey#203))
>   +- InMemoryTableScan [R_REGIONKEY#203,
> r_name#204], [isnotnull(r_name#204), (r_name#204 = AFRICA),
> isnotnull(r_regionkey#203)]
> +- InMemoryRelation [R_REGIONKEY#203,
> R_NAME#204, R_COMMENT#205], true, 1, StorageLevel(disk, memory, 1
> replicas)
>   +- *(1) Scan
> JDBCRelation(region) [numPartitions=1] 
> [R_REGIONKEY#203,R_NAME#204,R_COMMENT#205]
> PushedFilters: [], ReadSchema: struct<R_REGIONKEY:int,R_NAME:
> string,R_COMMENT:string>
>
>
> As you see, all JDBCRelation convert to InMemoryRelation. Cause the JDBC
> table is so big, the all data can not be filled into memory,  OOM occurs.
> If there is some option to make SparkSQL use Disk if memory not enough?
>



-- 
---
Takeshi Yamamuro


Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Takeshi Yamamuro
Can you file a jira if this is a bug?
Thanks!

On Sat, Mar 24, 2018 at 1:23 AM, Michael Shtelma <mshte...@gmail.com> wrote:

> Hi Maropu,
>
> the problem seems to be in FilterEstimation.scala on lines 50 and 52:
> https://github.com/apache/spark/blob/master/sql/
> catalyst/src/main/scala/org/apache/spark/sql/catalyst/
> plans/logical/statsEstimation/FilterEstimation.scala?utf8=✓#L50-L52
>
> val filterSelectivity =
> calculateFilterSelectivity(plan.condition).getOrElse(1.0)
> val filteredRowCount: BigInt =
> ceil(BigDecimal(childStats.rowCount.get) * filterSelectivity)
>
> The problem is, that filterSelectivity gets NaN value in my case and
> NaN cannot be converted to BigDecimal.
> I can try adding simple if, checking the NaN value and test if this helps.
> I will also try to understand, why in my case, I am getting NaN.
>
> Best,
> Michael
>
>
> On Fri, Mar 23, 2018 at 1:51 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
> > hi,
> >
> > What's a query to reproduce this?
> > It seems when casting double to BigDecimal, it throws the exception.
> >
> > // maropu
> >
> > On Fri, Mar 23, 2018 at 6:20 PM, Michael Shtelma <mshte...@gmail.com>
> wrote:
> >>
> >> Hi all,
> >>
> >> I am using Spark 2.3 with activated cost-based optimizer and a couple
> >> of hive tables, that were analyzed previously.
> >>
> >> I am getting the following exception for different queries:
> >>
> >> java.lang.NumberFormatException
> >>
> >> at java.math.BigDecimal.(BigDecimal.java:494)
> >>
> >> at java.math.BigDecimal.(BigDecimal.java:824)
> >>
> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:52)
> >>
> >> at scala.math.BigDecimal$.decimal(BigDecimal.scala:55)
> >>
> >> at scala.math.BigDecimal$.double2bigDecimal(BigDecimal.scala:343)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> FilterEstimation.estimate(FilterEstimation.scala:52)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:43)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitFilter(BasicStatsPlanVisitor.scala:25)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlanVisitor$class.
> visit(LogicalPlanVisitor.scala:30)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visit(BasicStatsPlanVisitor.scala:25)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:35)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$$anonfun$stats$1.apply(LogicalPlanStats.scala:33)
> >>
> >> at scala.Option.getOrElse(Option.scala:121)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> LogicalPlanStats$class.stats(LogicalPlanStats.scala:33)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.
> stats(LogicalPlan.scala:30)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$$anonfun$rowCountsExist$1.apply(EstimationUtils.scala:32)
> >>
> >> at
> >> scala.collection.IndexedSeqOptimized$class.prefixLengthImpl(
> IndexedSeqOptimized.scala:38)
> >>
> >> at
> >> scala.collection.IndexedSeqOptimized$class.forall(IndexedSeqOptimized.
> scala:43)
> >>
> >> at scala.collection.mutable.WrappedArray.forall(WrappedArray.scala:35)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> EstimationUtils$.rowCountsExist(EstimationUtils.scala:32)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> ProjectEstimation$.estimate(ProjectEstimation.scala:27)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.plans.logical.statsEstimation.
> BasicStatsPlanVisitor$.visitProject(BasicStatsPlanVisitor.scala:63)
> >>
> >> at
> >> org.apache.spark.sql.catalyst.p

Re: Using CBO on Spark 2.3 with analyzed hive tables

2018-03-23 Thread Takeshi Yamamuro
immutable.List.foreach(List.scala:392)
>
> at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(
> RuleExecutor.scala:76)
>
> at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(
> QueryExecution.scala:66)
>
> at org.apache.spark.sql.execution.QueryExecution.
> optimizedPlan(QueryExecution.scala:66)
>
> at org.apache.spark.sql.execution.QueryExecution$$
> anonfun$toString$2.apply(QueryExecution.scala:204)
>
> at org.apache.spark.sql.execution.QueryExecution$$
> anonfun$toString$2.apply(QueryExecution.scala:204)
>
> at org.apache.spark.sql.execution.QueryExecution.
> stringOrError(QueryExecution.scala:100)
>
> at org.apache.spark.sql.execution.QueryExecution.
> toString(QueryExecution.scala:204)
>
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:74)
>
> at org.apache.spark.sql.DataFrameWriter.runCommand(
> DataFrameWriter.scala:654)
>
> at org.apache.spark.sql.DataFrameWriter.createTable(
> DataFrameWriter.scala:458)
>
> at org.apache.spark.sql.DataFrameWriter.saveAsTable(
> DataFrameWriter.scala:437)
>
> at org.apache.spark.sql.DataFrameWriter.saveAsTable(
> DataFrameWriter.scala:393)
>
>
>
> This exception only comes, if the statistics exist for the hive tables
> being used.
>
> Has anybody already seen something like this ?
> Any assistance would be greatly appreciated!
>
> Best,
> Michael
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Reading CSV with multiLine option invalidates encoding option.

2017-08-16 Thread Takeshi Yamamuro
Hi,

Since the csv source currently supports ascii-compatible charset, so I
guess shift-jis also works well.
You could check Hyukjin's comment in
https://issues.apache.org/jira/browse/SPARK-21289 for more info.


On Wed, Aug 16, 2017 at 2:54 PM, Han-Cheol Cho <prian...@gmail.com> wrote:

> My apologies,
>
> It was a problem of our Hadoop cluster.
> When we tested the same code on another cluster (HDP-based), it worked
> without any problem.
>
> ```scala
> ## make sjis text
> cat a.txt
> 8月データだけでやってみよう
> nkf -W -s a.txt >b.txt
> cat b.txt
> 87n%G!<%?$@$1$G$d$C$F$_$h$&
> nkf -s -w b.txt
> 8月データだけでやってみよう
> hdfs dfs -put a.txt b.txt
>
> ## YARN mode test
> spark.read.option("encoding", "utf-8").csv("a.txt").show(1)
> +--+
> |   _c0|
> +--+
> |8月データだけでやってみよう|
> +--+
>
> spark.read.option("encoding", "sjis").csv("b.txt").show(1)
> +--+
> |   _c0|
> +--+
> |8月データだけでやってみよう|
> +--+
>
> spark.read.option("encoding", "utf-8").option("multiLine",
> true).csv("a.txt").show(1)
> +--+
> |   _c0|
> +--+
> |8月データだけでやってみよう|
> +--+
>
> spark.read.option("encoding", "sjis").option("multiLine",
> true).csv("b.txt").show(1)
> +--+
> |   _c0|
> +--+
> |8月データだけでやってみよう|
> +--+
> ```
>
> I am still digging the root cause and will share it later :-)
>
> Best wishes,
> Han-Choel
>
>
> On Wed, Aug 16, 2017 at 1:32 PM, Han-Cheol Cho <prian...@gmail.com> wrote:
>
>> Dear Spark ML members,
>>
>>
>> I experienced a trouble in using "multiLine" option to load CSV data with
>> Shift-JIS encoding.
>> When option("multiLine", true) is specified, option("encoding",
>> "encoding-name") just doesn't work anymore.
>>
>>
>> In CSVDataSource.scala file, I found that MultiLineCSVDataSource.readFile()
>> method doesn't use parser.options.charset at all.
>>
>> object MultiLineCSVDataSource extends CSVDataSource {
>>   override val isSplitable: Boolean = false
>>
>>   override def readFile(
>>   conf: Configuration,
>>   file: PartitionedFile,
>>   parser: UnivocityParser,
>>   schema: StructType): Iterator[InternalRow] = {
>> UnivocityParser.parseStream(
>>   CodecStreams.createInputStreamWithCloseResource(conf,
>> file.filePath),
>>   parser.options.headerFlag,
>>   parser,
>>   schema)
>>   }
>>   ...
>>
>> On the other hand, TextInputCSVDataSource.readFile() method uses it:
>>
>>   override def readFile(
>>   conf: Configuration,
>>   file: PartitionedFile,
>>   parser: UnivocityParser,
>>   schema: StructType): Iterator[InternalRow] = {
>> val lines = {
>>   val linesReader = new HadoopFileLinesReader(file, conf)
>>   Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ =>
>> linesReader.close()))
>>   linesReader.map { line =>
>> new String(line.getBytes, 0, line.getLength,
>> parser.options.charset)// < charset option is used here.
>>   }
>> }
>>
>> val shouldDropHeader = parser.options.headerFlag && file.start == 0
>> UnivocityParser.parseIterator(lines, shouldDropHeader, parser,
>> schema)
>>   }
>>
>>
>> It seems like a bug.
>> Is there anyone who had the same problem before?
>>
>>
>> Best wishes,
>> Han-Cheol
>>
>> --
>> ==
>> Han-Cheol Cho, Ph.D.
>> Data scientist, Data Science Team, Data Laboratory
>> NHN Techorus Corp.
>>
>> Homepage: https://sites.google.com/site/priancho/
>> ==
>>
>
>
>
> --
> ==
> Han-Cheol Cho, Ph.D.
> Data scientist, Data Science Team, Data Laboratory
> NHN Techorus Corp.
>
> Homepage: https://sites.google.com/site/priancho/
> ==
>



-- 
---
Takeshi Yamamuro


Re: custom column types for JDBC datasource writer

2017-07-05 Thread Takeshi Yamamuro
-dev +user

You can in master and see
https://github.com/apache/spark/commit/c7911807050227fcd13161ce090330d9d8daa533
.
This option will be available in the next release.

// maropu

On Thu, Jul 6, 2017 at 1:25 PM, Georg Heiler <georg.kf.hei...@gmail.com>
wrote:

> Hi,
> is it possible to somehow make spark not use VARCHAR(255) but something
> bigger i.e. CLOB for Strings?
>
> If not, is it at least possible to catch the exception which is thrown. To
> me, it seems that spark is catching and logging it - so I can no longer
> intervene and handle it:
>
> https://stackoverflow.com/questions/44927764/spark-jdbc-
> oracle-long-string-fields
>
> Regards,
> Georg
>



-- 
---
Takeshi Yamamuro


Re: UDF percentile_approx

2017-06-14 Thread Takeshi Yamamuro
You can use the function w/o hive and you can try:

scala> Seq(1.0, 8.0).toDF("a").selectExpr("percentile_approx(a, 0.5)").show

++

|percentile_approx(a, CAST(0.5 AS DOUBLE), 1)|

++

| 8.0|

++


// maropu



On Wed, Jun 14, 2017 at 5:04 PM, Riccardo Ferrari <ferra...@gmail.com>
wrote:

> Hi Andres,
>
> I can't find the refrence, last time I searched for that I found that
> 'percentile_approx' is only available via hive context. You should register
> a temp table and use it from there.
>
> Best,
>
> On Tue, Jun 13, 2017 at 8:52 PM, Andrés Ivaldi <iaiva...@gmail.com> wrote:
>
>> Hello, I`m trying to user percentile_approx  on my SQL query, but It's
>> like spark context can´t find the function
>>
>> I'm using it like this
>> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.DataFrameStatFunctions
>>
>> val e = expr("percentile_approx(Cantidadcon0234514)")
>> df.agg(e).show()
>>
>> and exception is
>>
>> org.apache.spark.sql.AnalysisException: Undefined function:
>> 'percentile_approx'. This function is neither a registered temporary
>> function nor a permanent function registered
>>
>> I've also tryid with callUDF
>>
>> Regards.
>>
>> --
>> Ing. Ivaldi Andres
>>
>
>


-- 
---
Takeshi Yamamuro


Re: [CSV] If number of columns of one row bigger than maxcolumns it stop the whole parsing process.

2017-06-08 Thread Takeshi Yamamuro
I filed a jira about this issue:
https://issues.apache.org/jira/browse/SPARK-21024

On Thu, Jun 8, 2017 at 1:27 AM, Chanh Le <giaosu...@gmail.com> wrote:

> Can you recommend one?
>
> Thanks.
>
> On Thu, Jun 8, 2017 at 2:47 PM Jörn Franke <jornfra...@gmail.com> wrote:
>
>> You can change the CSV parser library
>>
>> On 8. Jun 2017, at 08:35, Chanh Le <giaosu...@gmail.com> wrote:
>>
>>
>> I did add mode -> DROPMALFORMED but it still couldn't ignore it because
>> the error raise from the CSV library that Spark are using.
>>
>>
>> On Thu, Jun 8, 2017 at 12:11 PM Jörn Franke <jornfra...@gmail.com> wrote:
>>
>>> The CSV data source allows you to skip invalid lines - this should also
>>> include lines that have more than maxColumns. Choose mode "DROPMALFORMED"
>>>
>>> On 8. Jun 2017, at 03:04, Chanh Le <giaosu...@gmail.com> wrote:
>>>
>>> Hi Takeshi, Jörn Franke,
>>>
>>> The problem is even I increase the maxColumns it still have some lines
>>> have larger columns than the one I set and it will cost a lot of memory.
>>> So I just wanna skip the line has larger columns than the maxColumns I
>>> set.
>>>
>>> Regards,
>>> Chanh
>>>
>>>
>>> On Thu, Jun 8, 2017 at 12:48 AM Takeshi Yamamuro <linguin@gmail.com>
>>> wrote:
>>>
>>>> Is it not enough to set `maxColumns` in CSV options?
>>>>
>>>> https://github.com/apache/spark/blob/branch-2.1/sql/
>>>> core/src/main/scala/org/apache/spark/sql/execution/
>>>> datasources/csv/CSVOptions.scala#L116
>>>>
>>>> // maropu
>>>>
>>>> On Wed, Jun 7, 2017 at 9:45 AM, Jörn Franke <jornfra...@gmail.com>
>>>> wrote:
>>>>
>>>>> Spark CSV data source should be able
>>>>>
>>>>> On 7. Jun 2017, at 17:50, Chanh Le <giaosu...@gmail.com> wrote:
>>>>>
>>>>> Hi everyone,
>>>>> I am using Spark 2.1.1 to read csv files and convert to avro files.
>>>>> One problem that I am facing is if one row of csv file has more
>>>>> columns than maxColumns (default is 20480). The process of parsing
>>>>> was stop.
>>>>>
>>>>> Internal state when error was thrown: line=1, column=3, record=0,
>>>>> charIndex=12
>>>>> com.univocity.parsers.common.TextParsingException: 
>>>>> java.lang.ArrayIndexOutOfBoundsException
>>>>> - 2
>>>>> Hint: Number of columns processed may have exceeded limit of 2
>>>>> columns. Use settings.setMaxColumns(int) to define the maximum number of
>>>>> columns your input can have
>>>>> Ensure your configuration is correct, with delimiters, quotes and
>>>>> escape sequences that match the input format you are trying to parse
>>>>> Parser Configuration: CsvParserSettings:
>>>>>
>>>>>
>>>>> I did some investigation in univocity
>>>>> <https://github.com/uniVocity/univocity-parsers> library but the way
>>>>> it handle is throw error that why spark stop the process.
>>>>>
>>>>> How to skip the invalid row and just continue to parse next valid one?
>>>>> Any libs can replace univocity in that job?
>>>>>
>>>>> Thanks & regards,
>>>>> Chanh
>>>>> --
>>>>> Regards,
>>>>> Chanh
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>> --
>>> Regards,
>>> Chanh
>>>
>>> --
>> Regards,
>> Chanh
>>
>> --
> Regards,
> Chanh
>



-- 
---
Takeshi Yamamuro


Re: [CSV] If number of columns of one row bigger than maxcolumns it stop the whole parsing process.

2017-06-07 Thread Takeshi Yamamuro
Is it not enough to set `maxColumns` in CSV options?

https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala#L116

// maropu

On Wed, Jun 7, 2017 at 9:45 AM, Jörn Franke <jornfra...@gmail.com> wrote:

> Spark CSV data source should be able
>
> On 7. Jun 2017, at 17:50, Chanh Le <giaosu...@gmail.com> wrote:
>
> Hi everyone,
> I am using Spark 2.1.1 to read csv files and convert to avro files.
> One problem that I am facing is if one row of csv file has more columns
> than maxColumns (default is 20480). The process of parsing was stop.
>
> Internal state when error was thrown: line=1, column=3, record=0,
> charIndex=12
> com.univocity.parsers.common.TextParsingException: 
> java.lang.ArrayIndexOutOfBoundsException
> - 2
> Hint: Number of columns processed may have exceeded limit of 2 columns.
> Use settings.setMaxColumns(int) to define the maximum number of columns
> your input can have
> Ensure your configuration is correct, with delimiters, quotes and escape
> sequences that match the input format you are trying to parse
> Parser Configuration: CsvParserSettings:
>
>
> I did some investigation in univocity
> <https://github.com/uniVocity/univocity-parsers> library but the way it
> handle is throw error that why spark stop the process.
>
> How to skip the invalid row and just continue to parse next valid one?
> Any libs can replace univocity in that job?
>
> Thanks & regards,
> Chanh
> --
> Regards,
> Chanh
>
>


-- 
---
Takeshi Yamamuro


Re: Documentation on "Automatic file coalescing for native data sources"?

2017-05-20 Thread Takeshi Yamamuro
I think this document points to a logic here:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L418

This logic merge small files into a partition and you can control this
threshold via `spark.sql.files.maxPartitionBytes`.

// maropu


On Sat, May 20, 2017 at 8:15 AM, ayan guha <guha.a...@gmail.com> wrote:

> I think like all other read operations, it is driven by input format used,
> and I think some variation of combine file input format is used by default.
> I think you can test it by force a particular input format which gets ine
> file per split, then you should end up with same number of partitions as
> your dsta files
>
> On Sat, 20 May 2017 at 5:12 am, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hey all,
>>
>> A reply on this would be great!
>>
>> Thanks,
>> A.B.
>>
>> On 17-May-2017 1:43 AM, "Daniel Siegmann" <dsiegm...@securityscorecard.io>
>> wrote:
>>
>>> When using spark.read on a large number of small files, these are
>>> automatically coalesced into fewer partitions. The only documentation I can
>>> find on this is in the Spark 2.0.0 release notes, where it simply says (
>>> http://spark.apache.org/releases/spark-release-2-0-0.html):
>>>
>>> "Automatic file coalescing for native data sources"
>>>
>>> Can anyone point me to documentation explaining what triggers this
>>> feature, how it decides how many partitions to coalesce to, and what counts
>>> as a "native data source"? I couldn't find any mention of this feature in
>>> the SQL Programming Guide and Google was not helpful.
>>>
>>> --
>>> Daniel Siegmann
>>> Senior Software Engineer
>>> *SecurityScorecard Inc.*
>>> 214 W 29th Street, 5th Floor
>>> New York, NY 10001
>>>
>>> --
> Best Regards,
> Ayan Guha
>



-- 
---
Takeshi Yamamuro


Re: pyspark.sql.DataFrame write error to Postgres DB

2017-04-20 Thread Takeshi Yamamuro
Why you use a mysql jdbc driver?

// maropu

On Fri, Apr 21, 2017 at 11:54 AM, Cinyoung Hur <cinyoung@gmail.com>
wrote:

> Hi,
>
> I tried to insert Dataframe into Postgres DB.
>
> But, I don't know what causes this error.
>
>
> properties = {
> "user": "user",
> "password": "pass",
> "driver": "com.mysql.jdbc.Driver",
> }
> url = "jdbc:mysql://ip 
> address/MYDB?useServerPrepStmts=false=true"
> df.write.jdbc(url=url, table=table_name, mode='overwrite', 
> properties=properties)
>
>
>
> The schema of Dataframe is this.
>
> root
>  |-- fom_tp_cd: string (nullable = true)
>  |-- agg: integer (nullable = true)
>  |-- sex_tp_cd: integer (nullable = true)
>  |-- dgsbjt_cd: string (nullable = true)
>  |-- recu_cl_cd: integer (nullable = true)
>  |-- sick_set: string (nullable = true)
>  |-- gnl_nm_set: string (nullable = true)
>  |-- count: long (nullable = false)
>
>
>
> Py4JJavaErrorTraceback (most recent call last) 
> in ()> 1 result1.filter(result1["gnl_nm_set"] == "").count()
> /usr/local/linewalks/spark/spark/python/pyspark/sql/dataframe.pyc in 
> count(self)297 2298 """--> 299 return 
> int(self._jdf.count())300 301 @ignore_unicode_prefix
> /usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)931 answer = 
> self.gateway_client.send_command(command)932 return_value = 
> get_return_value(--> 933 answer, self.gateway_client, 
> self.target_id, self.name)934 935 for temp_arg in temp_args:
> /usr/local/linewalks/spark/spark/python/pyspark/sql/utils.pyc in deco(*a, 
> **kw) 61 def deco(*a, **kw): 62 try:---> 63 
> return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:
>  65 s = e.java_exception.toString()
> /usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)310  
>raise Py4JJavaError(311 "An error occurred 
> while calling {0}{1}{2}.\n".--> 312 format(target_id, 
> ".", name), value)313 else:314 raise 
> Py4JError(
> Py4JJavaError: An error occurred while calling o1331.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 178 
> in stage 324.0 failed 4 times, most recent failure: Lost task 178.3 in stage 
> 324.0 (TID 14274, 7.linewalks.local): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 172, in main
> process()
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 167, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 106, in 
> func = lambda _, it: map(mapper, it)
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 92, in 
> mapper = lambda a: udf(*a)
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 70, in 
> return lambda *a: f(*a)
>   File "", line 3, in 
> TypeError: sequence item 0: expected string, NoneType found
>
>


-- 
---
Takeshi Yamamuro


Re: From C* to DataFrames with JSON

2017-02-11 Thread Takeshi Yamamuro
If you upgrade to v2.1, you can use to_json/from_json in sql.functions.

On Fri, Feb 10, 2017 at 3:12 PM, Jean-Francois Gosselin <
jfgosse...@gmail.com> wrote:

>
> Hi all,
>
> I'm struggling (Spark / Scala newbie) to create a DataFrame from a C*
> table but also create a DataFrame from column with json .
>
> e.g. From C* table
>
> | id | jsonData  |
> ==
> | 1 |  {"a": "123", "b": "xyz" } |
> +--+---+
> | 2 |  {"a": "3", "b": "bar" } |
>
>
> to Spark DataFrame:
>
> | id |  a   |  b   |
> ===
> | 1 | 123 | xyz |
> +--+--+-+
> | 2 | 3 | bar |
>
>
> I'm using Spark 1.6 .
>
> Thanks
>
>
> JF
>



-- 
---
Takeshi Yamamuro


Re: EC2 script is missing in Spark 2.0.0~2.1.0

2017-02-11 Thread Takeshi Yamamuro
Moved to https://github.com/amplab/spark-ec2.
Yea, I think the script just was moved there, so you can use it in the same
way.

On Sat, Feb 11, 2017 at 9:59 PM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Hi Takeshi,
>
> Now  I understand that spark-ec2 script was moved to AMPLab. How could I
> use that one i.e. new location/URL, please? Alternatively, can I use the
> same script provided with prior Spark releases?
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>
> On 11 February 2017 at 12:43, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> Have you checked this?
>> https://issues.apache.org/jira/browse/SPARK-12735
>>
>> // maropu
>>
>> On Sat, Feb 11, 2017 at 9:34 PM, Md. Rezaul Karim <
>> rezaul.ka...@insight-centre.org> wrote:
>>
>>> Dear Spark Users,
>>>
>>> I was wondering why the EC2 script is missing in Spark release
>>> 2.0.0.~2.1.0? Is there any specific reason for that?
>>>
>>> Please note that I have chosen the package type: Pre-built for Hadoop
>>> 2.7 and later for Spark 2.1.0 for example. Am I doing something wrong?
>>>
>>>
>>>
>>> Regards,
>>> _
>>> *Md. Rezaul Karim*, BSc, MSc
>>> PhD Researcher, INSIGHT Centre for Data Analytics
>>> National University of Ireland, Galway
>>> IDA Business Park, Dangan, Galway, Ireland
>>> Web: http://www.reza-analytics.eu/index.html
>>> <http://139.59.184.114/index.html>
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: EC2 script is missing in Spark 2.0.0~2.1.0

2017-02-11 Thread Takeshi Yamamuro
Hi,

Have you checked this?
https://issues.apache.org/jira/browse/SPARK-12735

// maropu

On Sat, Feb 11, 2017 at 9:34 PM, Md. Rezaul Karim <
rezaul.ka...@insight-centre.org> wrote:

> Dear Spark Users,
>
> I was wondering why the EC2 script is missing in Spark release
> 2.0.0.~2.1.0? Is there any specific reason for that?
>
> Please note that I have chosen the package type: Pre-built for Hadoop 2.7
> and later for Spark 2.1.0 for example. Am I doing something wrong?
>
>
>
> Regards,
> _
> *Md. Rezaul Karim*, BSc, MSc
> PhD Researcher, INSIGHT Centre for Data Analytics
> National University of Ireland, Galway
> IDA Business Park, Dangan, Galway, Ireland
> Web: http://www.reza-analytics.eu/index.html
> <http://139.59.184.114/index.html>
>



-- 
---
Takeshi Yamamuro


Re: increasing cross join speed

2017-02-01 Thread Takeshi Yamamuro
Hi,

I'm not sure how to improve this kind of queries only on vanilla spark
though,
you can write custom physical plans for top-k queries.
You can check the link below as a reference;
benchmark: https://github.com/apache/incubator-hivemall/pull/33
manual:
https://github.com/apache/incubator-hivemall/blob/master/docs/gitbook/spark/misc/topk_join.md

I hope this helps for you.
Thanks,

// maropu


On Wed, Feb 1, 2017 at 6:35 AM, Kürşat Kurt <kur...@kursatkurt.com> wrote:

> Hi;
>
>
>
> I have 2 dataframes. I am trying to cross join for finding vector
> distances. Then i can choose the most similiar vectors.
>
> Cross join speed is too slow. How can i increase the speed, or have you
> any suggestion for this comparision?
>
>
>
>
>
> *val* result=myDict.join(mainDataset).map(x=>{
>
>
>
>*val* orgClassName1 =x.getAs[SparseVector](1);
>
>*val* orgClassName2 =x.getAs[SparseVector](2);
>
>*val* f1=x.getAs[SparseVector](3);
>
>*val* f2=x.getAs[SparseVector](4);
>
>*val* dist=Vectors.sqdist(f1,f2);
>
>
>
>(orgClassName1, orgClassName2,dist)
>
>  }).toDF("orgClassName1","orgClassName2,"dist");
>
>
>
>
>
>
>



-- 
---
Takeshi Yamamuro


Re: Kinesis streaming misunderstanding..?

2017-01-27 Thread Takeshi Yamamuro
Probably, he referred to the word-couting example in kinesis here:
https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala#L114


On Fri, Jan 27, 2017 at 6:41 PM, ayan guha <guha.a...@gmail.com> wrote:

> Maybe a naive question: why are you creating 1 Dstream per shard? It
> should be one Dstream corresponding to kinesis stream, isn't it?
>
> On Fri, Jan 27, 2017 at 8:09 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> Just a guess though, Kinesis shards sometimes have skew data.
>> So, before you compute something from kinesis RDDs, you'd be better to
>> repartition them
>> for better parallelism.
>>
>> // maropu
>>
>> On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote:
>>
>>> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera)
>>> to read information from Kinesis and write it to HDFS in parquet format.
>>> The write seems very slow, and if I understood Spark's diagnostics
>>> correctly, always seemed to run from the same executor, one partition after
>>> the other, serially. So I stripped the program down to this:
>>>
>>>
>>> val kinesisStreams = (0 until numShards).map { i => {
>>>
>>>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>>>
>>> kinesisStreamName, kinesisUrl, awsRegion,
>>> InitialPositionInStream.LATEST)
>>>
>>> new Duration(streamingInterval.millis),
>>> StorageLevel.MEMORY_AND_DISK_SER,
>>>
>>> awsCredentials.accessKey, awsCredentials.secretKey)
>>>
>>> }}
>>>
>>> val allKinesisStreams = streamingContext.union(kinesisStreams)
>>>
>>> allKinesisStreams.foreachRDD {
>>>
>>>rdd => {
>>>
>>>   info("total for this batch is " + rdd.count())
>>>
>>>}
>>> }
>>>
>>> The Kinesis stream has 20 shards (overprovisioned for this small test).
>>> I confirmed using a small boto program that data is periodically written to
>>> all 20 of the shards. I can see that Spark has created 20 executors, one
>>> for each Kinesis shard. It also creates one other executor, tied to a
>>> particular worker node, and that node seems to do the RDD counting. The
>>> streaming interval is 1 minute, during which time several shards have
>>> received data. Each minute interval, for this particular example, the
>>> driver prints out between 20 and 30 for the count value. I expected to see
>>> the count operation parallelized across the cluster. I think I must just be
>>> misunderstanding something fundamental! Can anyone point out where I'm
>>> going wrong?
>>>
>>> Yours in confusion,
>>> Graham
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
---
Takeshi Yamamuro


Re: Kinesis streaming misunderstanding..?

2017-01-27 Thread Takeshi Yamamuro
Hi,

Just a guess though, Kinesis shards sometimes have skew data.
So, before you compute something from kinesis RDDs, you'd be better to
repartition them
for better parallelism.

// maropu

On Fri, Jan 27, 2017 at 2:54 PM, Graham Clark <grcl...@gmail.com> wrote:

> Hi everyone - I am building a small prototype in Spark 1.6.0 (cloudera) to
> read information from Kinesis and write it to HDFS in parquet format. The
> write seems very slow, and if I understood Spark's diagnostics correctly,
> always seemed to run from the same executor, one partition after the other,
> serially. So I stripped the program down to this:
>
>
> val kinesisStreams = (0 until numShards).map { i => {
>
>   KinesisUtils.createStream(streamingContext, sparkApplicationName,
>
> kinesisStreamName, kinesisUrl, awsRegion, InitialPositionInStream.
> LATEST)
>
> new Duration(streamingInterval.millis), StorageLevel.MEMORY_AND_DISK_
> SER,
>
> awsCredentials.accessKey, awsCredentials.secretKey)
>
> }}
>
> val allKinesisStreams = streamingContext.union(kinesisStreams)
>
> allKinesisStreams.foreachRDD {
>
>rdd => {
>
>   info("total for this batch is " + rdd.count())
>
>}
> }
>
> The Kinesis stream has 20 shards (overprovisioned for this small test). I
> confirmed using a small boto program that data is periodically written to
> all 20 of the shards. I can see that Spark has created 20 executors, one
> for each Kinesis shard. It also creates one other executor, tied to a
> particular worker node, and that node seems to do the RDD counting. The
> streaming interval is 1 minute, during which time several shards have
> received data. Each minute interval, for this particular example, the
> driver prints out between 20 and 30 for the count value. I expected to see
> the count operation parallelized across the cluster. I think I must just be
> misunderstanding something fundamental! Can anyone point out where I'm
> going wrong?
>
> Yours in confusion,
> Graham
>
>


-- 
---
Takeshi Yamamuro


Re: spark intermediate data fills up the disk

2017-01-27 Thread Takeshi Yamamuro
IIUC, if the references of RDDs have gone, the related files (e.g.,
shuffled data) of these
RDDs are automatically removed by `ContextCleaner` (
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L178
).
Since spark can recompute from datasources (this is a fundamental concept
of RDDs), it seems removing these files directly results in failed jobs.
Though, I think removing them by yourself is a smarter way.

I'm not exactly sure about your query in the streaming though, I think your
query might
cause this situation you described.


On Fri, Jan 27, 2017 at 1:48 PM, <kanth...@gmail.com> wrote:

> Hi!
>
> Yes these files are for shuffle blocks however they need to be cleaned as
> well right? I had been running a streaming application for 2 days. The
> third day my disk fills up with all .index and .data files and my
> assumption is that these files had been there since the start of my
> streaming application I should have checked the time stamp before doing rm
> -rf. Please let me know if I am wrong
>
> Sent from my iPhone
>
> On Jan 26, 2017, at 4:24 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
> Yea, I think so and they are the intermediate files for shuffling.
> Probably, kant checked the configuration here (
> http://spark.apache.org/docs/latest/spark-standalone.html) though, this
> is not related to the issue.
>
> // maropu
>
> On Fri, Jan 27, 2017 at 7:46 AM, Jacek Laskowski <ja...@japila.pl> wrote:
>
>> Hi,
>>
>> The files are for shuffle blocks. Where did you find the docs about them?
>>
>> Jacek
>>
>> On 25 Jan 2017 8:41 p.m., "kant kodali" <kanth...@gmail.com> wrote:
>>
>> oh sorry its actually in the documentation. I should just
>> set spark.worker.cleanup.enabled = true
>>
>> On Wed, Jan 25, 2017 at 11:30 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> I have bunch of .index and .data files like that fills up my disk. I am
>>> not sure what the fix is? I am running spark 2.0.2 in stand alone mode
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>
>


-- 
---
Takeshi Yamamuro


Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread Takeshi Yamamuro
How about this?
https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala#L729
Or, how about using Double or something instead of Numeric?

// maropu

On Fri, Jan 27, 2017 at 10:25 AM, ayan guha <guha.a...@gmail.com> wrote:

> Okay, it is working with varchar columns only. Is there any way to
> workaround this?
>
> On Fri, Jan 27, 2017 at 12:22 PM, ayan guha <guha.a...@gmail.com> wrote:
>
>> hi
>>
>> I thought so too, so I created a table with INT and Varchar columns
>>
>> desc agtest1
>>
>> Name Null Type
>>   -
>> PID   NUMBER(38)
>> DES   VARCHAR2(100)
>>
>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>> table = "agtest1"
>> user = "bal"
>> password= "bal"
>> driver="oracle.jdbc.OracleDriver"
>> df = sqlContext.read.jdbc(url=url,table=table,properties={"user":
>> user,"password":password,"driver":driver})
>>
>>
>> Still the issue persists.
>>
>> On Fri, Jan 27, 2017 at 11:19 AM, Takeshi Yamamuro <linguin@gmail.com
>> > wrote:
>>
>>> Hi,
>>>
>>> I think you got this error because you used `NUMERIC` types in your
>>> schema (https://github.com/apache/spark/blob/master/sql/core/src/ma
>>> in/scala/org/apache/spark/sql/jdbc/OracleDialect.scala#L32). So, IIUC
>>> avoiding the type is a workaround.
>>>
>>> // maropu
>>>
>>>
>>> On Fri, Jan 27, 2017 at 8:18 AM, ayan guha <guha.a...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> I am facing exact issue with Oracle/Exadataas mentioned here
>>>> <http://stackoverflow.com/questions/41873449/sparksql-key-not-found-scale>.
>>>> Any idea? I could not figure out so sending to this grou hoping someone
>>>> have see it (and solved it)
>>>>
>>>> Spark Version: 1.6
>>>> pyspark command:
>>>>
>>>> pyspark --driver-class-path /opt/oracle/bigdatasql/bdcell-
>>>> 12.1/jlib-bds/kvclient.jar:/opt/oracle/bigdatasql/bdcell-12.
>>>> 1/jlib-bds/ojdbc7.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>>> b-bds/ojdbc7-orig.jar:/opt/oracle/bigdatasql/bdcell-12.1/jli
>>>> b-bds/oracle-hadoop-sql.jar:/opt/oracle/bigdatasql/bdcell-12
>>>> .1/jlib-bds/ora-hadoop-common.jar:/opt/oracle/bigdatasql/bdc
>>>> ell-12.1/jlib-bds/ora-hadoop-common-orig.jar:/opt/oracle/bi
>>>> gdatasql/bdcell-12.1/jlib-bds/orahivedp.jar:/opt/oracle/bigd
>>>> atasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar:/opt/oracle
>>>> /bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar:/opt/oracle/
>>>> bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.jar:/opt/
>>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader.jar:/opt/
>>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/oraloader-orig.jar   --conf
>>>> spark.jars=/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/oracl
>>>> e-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds
>>>> /ora-hadoop-common.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>>> jlib-bds/orahivedp.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>>> jlib-bds/oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/
>>>> jlib-bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>>> bds/orai18n.jar/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>>> bds/kvclient.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>>> bds/ojdbc7.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>>> bds/ojdbc7-orig.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>>> bds/oracle-hadoop-sql.jar,/opt/oracle/bigdatasql/bdcell-
>>>> 12.1/jlib-bds/ora-hadoop-common.jar,/opt/oracle/bigdata
>>>> sql/bdcell-12.1/jlib-bds/ora-hadoop-common-orig.jar,/opt/
>>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp.jar,/opt/
>>>> oracle/bigdatasql/bdcell-12.1/jlib-bds/orahivedp-orig.jar,/
>>>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n.jar,/
>>>> opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/orai18n-orig.
>>>> jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-bds/
>>>> oraloader.jar,/opt/oracle/bigdatasql/bdcell-12.1/jlib-
>>>> bds/oraloader-orig.jar
>>>>
>>>>
>>>> Here is my code:
>>>>
>>>> url="jdbc:oracle:thin:@mpimpclu1-scan:1521/DEVAIM"
>>>> table = "HIST_FORECAST_NEXT_BILL_DGTL"
>>>> user = "bal"
>>>&g

Re: spark intermediate data fills up the disk

2017-01-26 Thread Takeshi Yamamuro
Yea, I think so and they are the intermediate files for shuffling.
Probably, kant checked the configuration here (
http://spark.apache.org/docs/latest/spark-standalone.html) though, this is
not related to the issue.

// maropu

On Fri, Jan 27, 2017 at 7:46 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> The files are for shuffle blocks. Where did you find the docs about them?
>
> Jacek
>
> On 25 Jan 2017 8:41 p.m., "kant kodali" <kanth...@gmail.com> wrote:
>
> oh sorry its actually in the documentation. I should just
> set spark.worker.cleanup.enabled = true
>
> On Wed, Jan 25, 2017 at 11:30 AM, kant kodali <kanth...@gmail.com> wrote:
>
>> I have bunch of .index and .data files like that fills up my disk. I am
>> not sure what the fix is? I am running spark 2.0.2 in stand alone mode
>>
>> Thanks!
>>
>>
>>
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Oracle JDBC - Spark SQL - Key Not Found: Scale

2017-01-26 Thread Takeshi Yamamuro
  at org.apache.spark.sql.DataFrameReader.jdbc(
> DataFrameReader.scala:146)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at py4j.reflection.ReflectionEngine.invoke(
> ReflectionEngine.java:381)
> at py4j.Gateway.invoke(Gateway.java:259)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.
> java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:209)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
---
Takeshi Yamamuro


Re: Dataframe fails to save to MySQL table in spark app, but succeeds in spark shell

2017-01-25 Thread Takeshi Yamamuro
lIO.java:3906)
> at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524)
> at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2677)
> at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549)
> at com.mysql.jdbc.PreparedStatement.executeInternal(
> PreparedStatement.java:1861)
> at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(
> PreparedStatement.java:2073)
> at com.mysql.jdbc.PreparedStatement.executeBatchSerially(
> PreparedStatement.java:1751)
> ... 15 more
>
> Driver stacktrace:
> at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
> scheduler$DAGScheduler$$failJobAndIndependentStages(
> DAGScheduler.scala:1435)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1423)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> abortStage$1.apply(DAGScheduler.scala:1422)
> at scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(
> ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.abortStage(
> DAGScheduler.scala:1422)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$
> handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> at scala.Option.foreach(Option.scala:257)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
> DAGScheduler.scala:802)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> doOnReceive(DAGScheduler.scala:1650)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1605)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
> onReceive(DAGScheduler.scala:1594)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.runJob(
> DAGScheduler.scala:628)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:925)
> at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
> apply(RDD.scala:923)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:112)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
> at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
> at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply$mcV$sp(Dataset.scala:2305)
> at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2305)
> at org.apache.spark.sql.Dataset$$anonfun$foreachPartition$1.
> apply(Dataset.scala:2305)
> at org.apache.spark.sql.execution.SQLExecution$.
> withNewExecutionId(SQLExecution.scala:57)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.
> scala:2765)
> at org.apache.spark.sql.Dataset.foreachPartition(Dataset.
> scala:2304)
> at org.apache.spark.sql.execution.datasources.jdbc.
> JdbcUtils$.saveTable(JdbcUtils.scala:670)
> at org.apache.spark.sql.execution.datasources.jdbc.
> JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:77)
> at org.apache.spark.sql.execution.datasources.
> DataSource.write(DataSource.scala:426)
> at org.apache.spark.sql.DataFrameWriter.save(
> DataFrameWriter.scala:215)
> at io.optics.analytics.dataingestion.DataIngestion.
> run(DataIngestionJob.scala:119)
> at io.optics.analytics.dataingestion.DataIngestionJob$.main(
> DataIngestionJob.scala:28)
> at io.optics.analytics.dataingestion.DataIngestionJob.main(
> DataIngestionJob.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
>     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
> deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(
> SparkSubmit.scala:187)
> at org.apache.spark.deploy.SparkSubmit$.submit(
> SparkSubmit.scala:212)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.
> scala:126)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Any idea why it's happening? A possible bug in spark?
>
> Thanks,
> Dzung.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Issue returning Map from UDAF

2017-01-25 Thread Takeshi Yamamuro
ection$1.apply(AggregationIterator.scala:228)
> at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$
> generateResultProjection$1.apply(AggregationIterator.scala:220)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.
> next(SortBasedAggregationIterator.scala:152)
> at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.
> next(SortBasedAggregationIterator.scala:29)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:247)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> 4.apply(SparkPlan.scala:240)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$
> 1$$anonfun$apply$24.apply(RDD.scala:784)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
> at org.apache.spark.scheduler.Task.run(Task.scala:85)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Attached is the code that you can use to reproduce the error.
>
> Thanks
> Ankur
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>



-- 
---
Takeshi Yamamuro


Re: Catalyst Expression(s) - Cleanup

2017-01-25 Thread Takeshi Yamamuro
Hi,

if you mean clean-up in executors, how about using
TaskContext#addTaskCompletionListener?

// maropu

On Thu, Jan 26, 2017 at 3:13 AM, Bowden, Chris <chris.bow...@hpe.com> wrote:

> Is there currently any way to receive a signal when an Expression will no
> longer receive any rows so internal resources can be cleaned up?
>
> I have seen Generators are allowed to terminate() but my Expression(s) do
> not need to emit 0..N rows.
>



-- 
---
Takeshi Yamamuro


Re: freeing up memory occupied by processed Stream Blocks

2017-01-25 Thread Takeshi Yamamuro
AFAIK spark has no public APIs to clean up those RDDs.

On Wed, Jan 25, 2017 at 11:30 PM, Andrew Milkowski <amgm2...@gmail.com>
wrote:

> Hi Takeshi thanks for the answer, looks like spark would free up old RDD's
> however using admin UI we see ie
>
>  Block ID, it corresponds with each receiver and a timestamp.
> For example, block input-0-1485275695898 is from receiver 0 and it was
> created at 1485275695898 (1/24/2017, 11:34:55 AM GMT-5:00).
> That corresponds with the start time.
>
> that block even after running whole day is still not being released! RDD's
> in our scenario are Strings coming from kinesis stream
>
> is there a way to explicitly purge RDD after last step in M/R process once
> and for all ?
>
> thanks much!
>
> On Fri, Jan 20, 2017 at 2:35 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> AFAIK, the blocks of minibatch RDDs are checked every job finished, and
>> older blocks automatically removed (See: https://github.com/apach
>> e/spark/blob/master/streaming/src/main/scala/org/apache/
>> spark/streaming/dstream/DStream.scala#L463).
>>
>> You can control this behaviour by StreamingContext#remember to some
>> extent.
>>
>> // maropu
>>
>>
>> On Fri, Jan 20, 2017 at 3:17 AM, Andrew Milkowski <amgm2...@gmail.com>
>> wrote:
>>
>>> hello
>>>
>>> using spark 2.0.2  and while running sample streaming app with kinesis
>>> noticed (in admin ui Storage tab)  "Stream Blocks" for each worker keeps
>>> climbing up
>>>
>>> then also (on same ui page) in Blocks section I see blocks such as below
>>>
>>> input-0-1484753367056
>>>
>>> that are marked as Memory Serialized
>>>
>>> that do not seem to be "released"
>>>
>>> above eventually consumes executor memories leading to out of memory
>>> exception on some
>>>
>>> is there a way to "release" these blocks free them up , app is sample
>>> m/r
>>>
>>> I attempted rdd.unpersist(false) in the code but that did not lead to
>>> memory free up
>>>
>>> thanks much in advance!
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: printSchema showing incorrect datatype?

2017-01-24 Thread Takeshi Yamamuro
Hi,

AFAIK `Dataset#printSchema` just prints an output schema of the logical
plan that the Dataset has.

The logical plans in your example are as follows;

---

scala> x.as[Array[Byte]].explain(true)

== Analyzed Logical Plan ==

x: string

Project [value#1 AS x#3]

+- LocalRelation [value#1]

scala> x.as[Array[Byte]].map(x => x).explain(true)

== Analyzed Logical Plan ==

value: binary

SerializeFromObject [input[0, binary, true] AS value#43]

+- MapElements , class [B, [StructField(value,BinaryType,true)],
obj#42: binary

   +- DeserializeToObject cast(x#3 as binary), obj#41: binary

  +- Project [value#1 AS x#3]

 +- LocalRelation [value#1]

---

So, it seems they print different schemas.


// maropu



On Wed, Jan 25, 2017 at 1:28 AM, Koert Kuipers <ko...@tresata.com> wrote:

> scala> val x = Seq("a", "b").toDF("x")
> x: org.apache.spark.sql.DataFrame = [x: string]
>
> scala> x.as[Array[Byte]].printSchema
> root
>  |-- x: string (nullable = true)
>
> scala> x.as[Array[Byte]].map(x => x).printSchema
> root
>  |-- value: binary (nullable = true)
>
> why does the first schema show string instead of binary?
>



-- 
---
Takeshi Yamamuro


Re: help!!!----issue with spark-sql type cast form long to longwritable

2017-01-24 Thread Takeshi Yamamuro
Hi,

Could you show us the whole code to reproduce that?

// maropu

On Wed, Jan 25, 2017 at 12:02 AM, Deepak Sharma <deepakmc...@gmail.com>
wrote:

> Can you try writing the UDF directly in spark and register it with spark
> sql or hive context ?
> Or do you want to reuse the existing UDF jar for hive in spark ?
>
> Thanks
> Deepak
>
> On Jan 24, 2017 5:29 PM, "Sirisha Cheruvu" <siri8...@gmail.com> wrote:
>
>> Hi Team,
>>
>> I am trying to keep below code in get method and calling that get mthod
>> in another hive UDF
>> and running the hive UDF using Hive Context.sql procedure..
>>
>>
>> switch (f) {
>> case "double" :  return ((DoubleWritable)obj).get();
>> case "bigint" :  return ((LongWritable)obj).get();
>> case "string" :  return ((Text)obj).toString();
>> default  :  return obj;
>>   }
>> }
>>
>> Suprisingly only LongWritable and Text convrsions are throwing error but
>> DoubleWritable is working
>> So I tried changing below code to
>>
>> switch (f) {
>> case "double" :  return ((DoubleWritable)obj).get();
>> case "bigint" :  return ((DoubleWritable)obj).get();
>> case "string" :  return ((Text)obj).toString();
>> default  :  return obj;
>>   }
>> }
>>
>> Still its throws error saying Java.Lang.Long cant be convrted
>> to org.apache.hadoop.hive.serde2.io.DoubleWritable
>>
>>
>>
>> its working fine on hive but throwing error on spark-sql
>>
>> I am importing the below packages.
>> import java.util.*;
>> import org.apache.hadoop.hive.serde2.objectinspector.*;
>> import org.apache.hadoop.io.LongWritable;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.hive.serde2.io.DoubleWritable;
>>
>> .Please let me know why it is making issue in spark when perfectly
>> running fine on hive
>>
>


-- 
---
Takeshi Yamamuro


Re: converting timestamp column to a java.util.Date

2017-01-23 Thread Takeshi Yamamuro
Hi,

I think Spark UDF can only handle `java.sql.Date`.
So, you need to change the return type in you UDF.

// maropu

On Tue, Jan 24, 2017 at 8:18 AM, Marco Mistroni <mmistr...@gmail.com> wrote:

> HI all
>   i am trying to convert a  string column, in a Dataframe , to a
> java.util.Date but i am getting this exception
>
> [dispatcher-event-loop-0] INFO org.apache.spark.storage.BlockManagerInfo
> - Removed broadcast_0_piece0 on 169.254.2.140:53468 in memory (size: 14.3
> KB, free: 767.4 MB)
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Schema for type java.util.Date is not supported
>
> here's my code
>
> val tstampToDateFunc:(java.sql.Timestamp => java.util.Date) = ts => new
> java.util.Date(ts.getTime)
> val tsampConversionFunc = udf(tstampToDateFunc)
>
> sharesDf.withColumn("price", col("_c2").cast("double"))
> .withColumn("creationTime",
> tsampConversionFunc(col("_c1")))
>
> Are there any workarounds?
> i m trying to import data into mongoDB via Spark. The source is a csv file
> where
> i have  1 timestamp column and a bunch of strings. i will need to
> convert that
> to something compatible with a mongo's ISODate
>
> kr
>  marco
>
>



-- 
---
Takeshi Yamamuro


Re: freeing up memory occupied by processed Stream Blocks

2017-01-19 Thread Takeshi Yamamuro
Hi,

AFAIK, the blocks of minibatch RDDs are checked every job finished, and
older blocks automatically removed (See:
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L463
).

You can control this behaviour by StreamingContext#remember to some extent.

// maropu


On Fri, Jan 20, 2017 at 3:17 AM, Andrew Milkowski <amgm2...@gmail.com>
wrote:

> hello
>
> using spark 2.0.2  and while running sample streaming app with kinesis
> noticed (in admin ui Storage tab)  "Stream Blocks" for each worker keeps
> climbing up
>
> then also (on same ui page) in Blocks section I see blocks such as below
>
> input-0-1484753367056
>
> that are marked as Memory Serialized
>
> that do not seem to be "released"
>
> above eventually consumes executor memories leading to out of memory
> exception on some
>
> is there a way to "release" these blocks free them up , app is sample m/r
>
> I attempted rdd.unpersist(false) in the code but that did not lead to
> memory free up
>
> thanks much in advance!
>



-- 
---
Takeshi Yamamuro


Re: spark 2.02 error when writing to s3

2017-01-19 Thread Takeshi Yamamuro
Hi,

Do you get the same exception also in v2.1.0?
Anyway, I saw another guy reporting the same error, I think.
https://www.mail-archive.com/user@spark.apache.org/msg60882.html

// maropu


On Fri, Jan 20, 2017 at 5:15 AM, VND Tremblay, Paul <tremblay.p...@bcg.com>
wrote:

> I have come across a problem when writing CSV files to S3 in Spark 2.02.
> The problem does not exist in Spark 1.6.
>
>
>
> *19:09:20* Caused by: java.io.IOException: File already 
> exists:s3://stx-apollo-pr-datascience-internal/revenue_model/part-r-00025-c48a0d52-9600-4495-913c-64ae6bf888bd.csv
>
>
>
>
>
> My code is this:
>
>
>
> new_rdd\
>
> 135 .map(add_date_diff)\
>
> 136 .map(sid_offer_days)\
>
> 137 .groupByKey()\
>
> 138 .map(custom_sort)\
>
> 139 .map(before_rev_date)\
>
> 140 .map(lambda x, num_weeks = args.num_weeks: create_columns(x,
> num_weeks))\
>
> 141 .toDF()\
>
> 142 .write.csv(
>
> 143 sep = "|",
>
> 144 header = True,
>
> 145 nullValue = '',
>
> 146 quote = None,
>
> 147 path = path
>
> 148 )
>
>
>
> In order to get the path (the last argument), I call this function:
>
>
>
> 150 def _get_s3_write(test):
>
> 151 if s3_utility.s3_data_already_exists(_get_write_bucket_name(),
> _get_s3_write_dir(test)):
>
> 152 s3_utility.remove_s3_dir(_get_write_bucket_name(),
> _get_s3_write_dir(test))
>
> 153 return make_s3_path(_get_write_bucket_name(),
> _get_s3_write_dir(test))
>
>
>
> In other words, I am removing the directory if it exists before I write.
>
>
>
> Notes:
>
>
>
> * If I use a small set of data, then I don't get the error
>
>
>
> * If I use Spark 1.6, I don't get the error
>
>
>
> * If I read in a simple dataframe and then write to S3, I still get the
> error (without doing any transformations)
>
>
>
> * If I do the previous step with a smaller set of data, I don't get the
> error.
>
>
>
> * I am using pyspark, with python 2.7
>
>
>
> * The thread at this link: https://forums.aws.amazon.com/
> thread.jspa?threadID=152470  Indicates the problem is caused by a problem
> sync problem. With large datasets, spark tries to write multiple times and
> causes the error. The suggestion is to turn off speculation, but I believe
> speculation is turned off by default in pyspark.
>
>
>
> Thanks!
>
>
>
> Paul
>
>
>
>
>
> 
> _
>
> *Paul Tremblay *
> Analytics Specialist
>
> *THE BOSTON CONSULTING GROUP*
> STL ▪
>
> Tel. + ▪ Mobile +
> tremblay.p...@bcg.com
> 
> _
>
> Read BCG's latest insights, analysis, and viewpoints at
> *bcgperspectives.com* <http://www.bcgperspectives.com>
>
> --
>
> The Boston Consulting Group, Inc.
>
> This e-mail message may contain confidential and/or privileged
> information. If you are not an addressee or otherwise authorized to receive
> this message, you should not use, copy, disclose or take any action based
> on this e-mail or any information contained in the message. If you have
> received this material in error, please advise the sender immediately by
> reply e-mail and delete this message. Thank you.
>



-- 
---
Takeshi Yamamuro


Re: is partitionBy of DataFrameWriter supported in 1.6.x?

2017-01-19 Thread Takeshi Yamamuro
Hi,

In v1.6.0, it seems spark has supported `partitionBy` for JSON, text, ORC
and avro.
So, this is a bug of documents.
Actually, this bug was fixed in v1.6.1 (See:
https://github.com/apache/spark/commit/1005ee396f74dc4fcf127613b65e1abdb7f1934c
)

Also, AFAIK, this document only describes datasource tables in spark.
So, this is not related to Hive metastore tables, I think.

// maropu




On Thu, Jan 19, 2017 at 2:44 PM, Richard Xin <
richardxin...@yahoo.com.invalid> wrote:

> I found contradictions in document 1.6.0 and 2.1.x
>
> in http://spark.apache.org/docs/1.6.0/api/scala/index.html#
> org.apache.spark.sql.DataFrameWriter
> it says: "This is only applicable for Parquet at the moment."
>
> in http://spark.apache.org/docs/latest/api/scala/index.html#
> org.apache.spark.sql.DataFrameWriter
> it says: "This was initially applicable for Parquet but in 1.5+ covers
> JSON, text, ORC and avro as well."
>
> and I got warning when trying to save in scala:
>
> > df.write.mode("overwrite").format("orc").partitionBy("
> date").saveAsTable("test.my_test")
> 17/01/19 13:34:43 WARN hive.HiveContext$$anon$2: Persisting partitioned
> data source relation `test`.`my_test` into Hive metastore in Spark SQL
> specific format, which is NOT compatible with Hive. Input path(s):
> hdfs://nameservice1/user/hive/warehouse/test.db/my_test
>
> looking at hdfs directories, the folders are there, but not selectable on
> both presto and hive.
>
> any comments?
>
> Thanks.
>



-- 
---
Takeshi Yamamuro


Re: need a hive generic udf which also works on spark sql

2017-01-17 Thread Takeshi Yamamuro
Hi,

AFAIK, you could use Hive GenericUDF stuffs in spark without much effort.
If you'd like to check test suites about that, you'd better to visit
HiveUDFSuite.
https://github.com/apache/spark/blob/master/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala

I also have used Hive UDFs (including GenericUDF) in Spark for another
incubator project I belong to (https://github.com/apache/incubator-hivemall),
but I didn't hit critical issues so far.

// maropu


On Tue, Jan 17, 2017 at 5:31 PM, Sirisha Cheruvu <siri8...@gmail.com> wrote:

> Hi
>
> Anybody has a test and tried generic udf with object inspector
> implementaion which sucessfully ran on both hive and spark-sql
>
> please share me the git hub link or source code file
>
> Thanks in advance
> Sirisha
>



-- 
---
Takeshi Yamamuro


Re: partition size inherited from parent: auto coalesce

2017-01-16 Thread Takeshi Yamamuro
Hi,

The coalesce does not automatically happen now and you need to control the
number for yourself.
Basically, #partitions respect a `spark.default.parallelism`  number, by
default, #cores for your computer.
http://spark.apache.org/docs/latest/configuration.html#execution-behavior

// maropu

On Tue, Jan 17, 2017 at 11:58 AM, Suzen, Mehmet <su...@acm.org> wrote:

> Hello List,
>
>  I was wondering what is the design principle that partition size of
> an RDD is inherited from the parent.  See one simple example below
> [*]. 'ngauss_rdd2' has significantly less data, intuitively in such
> cases, shouldn't spark invoke coalesce automatically for performance?
> What would be the configuration option for this if there is any?
>
> Best,
> -m
>
> [*]
> // Generate 1 million Gaussian random numbers
> import util.Random
> Random.setSeed(4242)
> val ngauss = (1 to 1e6.toInt).map(x=>Random.nextGaussian)
> val ngauss_rdd = sc.parallelize(ngauss)
> ngauss_rdd.count // 1 million
> ngauss_rdd.partitions.size // 4
> val ngauss_rdd2 = ngauss_rdd.filter(x=>x > 4.0)
> ngauss_rdd2.count // 35
> ngauss_rdd2.partitions.size // 4
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Apache Spark example split/merge shards

2017-01-16 Thread Takeshi Yamamuro
Hi,

It seems you hit this issue:
https://issues.apache.org/jira/browse/SPARK-18020

// maropu

On Tue, Jan 17, 2017 at 11:51 AM, noppanit <noppani...@gmail.com> wrote:

> I'm totally new to Spark and I'm trying to learn from the example. I'm
> following this example
>
> https://github.com/apache/spark/blob/master/external/
> kinesis-asl/src/main/scala/org/apache/spark/examples/
> streaming/KinesisWordCountASL.scala.
>
> It works well. But I do have one question. Every time I split/merge Kinesis
> shards from the Amazon API, I have to restart the application. Is there a
> way for Spark application to automatically rebalance or to notify the
> application that Kinesis shards have been split or merged? I have to
> restart
> the application so I can see more shards in DynamoDB.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Apache-Spark-example-split-merge-shards-tp28311.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: [Spark Core] Re-using dataframes with limit() produces unexpected results

2017-01-12 Thread Takeshi Yamamuro
Hi,

I got the correct answer. Did I miss something?

// maropu

---
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.0.0
  /_/

Using Python version 2.7.10 (v2.7.10:15c95b7d81dc, May 23 2015 09:33:12)
SparkSession available as 'spark'.
>>>
>>> from pyspark.sql import Row
>>> rdd=sc.parallelize([Row(i=i) for i in range(100)],200)
>>> rdd1=rdd.toDF().limit(12345).rdd
>>> rdd2=rdd1.map(lambda x:(x,x))
>>> rdd2.join(rdd2).count()
12345


On Thu, Jan 12, 2017 at 4:18 PM, Ant <the.gere...@googlemail.com> wrote:

> Hello,
> it seems using a Spark DataFrame, which had limit() applied on it, in
> further calculations produces very unexpected results. Some people use it
> as poor man's sampling and end up debugging for hours to see what's going
> on. Here is an example
>
> from pyspark.sql import Row
> rdd=sc.parallelize([Row(i=i) for i in range(100)],200)
> rdd1=rdd.toDF().limit(12345).rdd
> rdd2=rdd1.map(lambda x:(x,x))
> rdd2.join(rdd2).count()
> # result is 10240 despite doing a self-join; expected 12345
>
> in Pyspark/Spark 2.0.0
>
> I understand that actions on limit may yield different rows every time,
> but re-calculating the same rdd2 within a single DAG is highly unexpected.
>
> Maybe a comment in the documentation may be helpful if there is no easy
> fix of limit. I do see that the intend for limit may be such that no two
> limit paths should occur in a single DAG.
>
> What do you think? What is the correct explanation?
>
> Anton
>



-- 
---
Takeshi Yamamuro


Re: How to hint Spark to use HashAggregate() for UDAF

2017-01-09 Thread Takeshi Yamamuro
Hi,

Spark always uses hash-based aggregates if the types of aggregated data are
supported there;
otherwise, spark fails to use hash-based ones, then it uses sort-based ones.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L38

So, I'm not sure about your query though, it seems the types of aggregated
data in your query
are not supported for hash-based aggregates.

// maropu



On Mon, Jan 9, 2017 at 10:52 PM, Andy Dang <nam...@gmail.com> wrote:

> Hi all,
>
> It appears to me that Dataset.groupBy().agg(udaf) requires a full sort,
> which is very inefficient for certain aggration:
>
> The code is very simple:
> - I have a UDAF
> - What I want to do is: dataset.groupBy(cols).agg(udaf).count()
>
> The physical plan I got was:
> *HashAggregate(keys=[], functions=[count(1)], output=[count#67L])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[partial_count(1)],
> output=[count#71L])
>   +- *Project
>  +- Generate explode(internal_col#31), false, false,
> [internal_col#42]
> +- SortAggregate(key=[key#0], functions=[aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[internal_col#31])
>+- *Sort [key#0 ASC], false, 0
>   +- Exchange hashpartitioning(key#0, 200)
>  +- SortAggregate(key=[key#0], 
> functions=[partial_aggregatefunction(key#0,
> nested#1, nestedArray#2, nestedObjectArray#3, value#4L,
> com.[...]uDf@108b121f, 0, 0)], output=[key#0,internal_col#37])
> +- *Sort [key#0 ASC], false, 0
>+- Scan ExistingRDD[key#0,nested#1,
> nestedArray#2,nestedObjectArray#3,value#4L]
>
> How can I make Spark to use HashAggregate (like the count(*) expression)
> instead of SortAggregate with my UDAF?
>
> Is it intentional? Is there an issue tracking this?
>
> ---
> Regards,
> Andy
>



-- 
---
Takeshi Yamamuro


Re: The spark hive udf can read broadcast the variables?

2016-12-18 Thread Takeshi Yamamuro
Hi,

No, you can't.
If you use ScalaUdf, you can like this;

val bv = sc.broadcast(100)
val testUdf = udf { (i: Long) => i + bv.value }
spark.range(10).select(testUdf('id)).show


// maropu


On Sun, Dec 18, 2016 at 12:24 AM, 李斌松 <libinsong1...@gmail.com> wrote:

> The spark hive udf can read broadcast the variables?
>



-- 
---
Takeshi Yamamuro


Re: Managed memory leak : spark-2.0.2

2016-12-08 Thread Takeshi Yamamuro
Hi,

Did you hit some troubles from the memory leak?
I think we can ignore the message in most cases because TaskMemoryManager
automatically releases the memory. In fact, spark degraded the message
in SPARK-18557.
https://issues.apache.org/jira/browse/SPARK-18557

// maropu

On Thu, Dec 8, 2016 at 8:10 PM, Appu K <kut...@gmail.com> wrote:

> Hello,
>
> I’ve just ran into an issue where the job is giving me "Managed memory
> leak" with spark version 2.0.2
>
> —
> 2016-12-08 16:31:25,231 [Executor task launch worker-0]
> (TaskMemoryManager.java:381) WARN leak 46.2 MB memory from
> org.apache.spark.util.collection.ExternalAppendOnlyMap@22719fb8
> 2016-12-08 16:31:25,232 [Executor task launch worker-0] (Logging.scala:66)
> WARN Managed memory leak detected; size = 48442112 bytes, TID = 1
> —
>
>
> The program itself is very basic and looks like take() is causing the
> issue
>
> Program: https://gist.github.com/kutt4n/87cfcd4e794b1865b6f880412dd80bbf
> Debug Log: https://gist.github.com/kutt4n/ba3cf812dced34ceadc588856edc
>
>
> TaskMemoryManager.java:381 says that it's normal to see leaked memory if
> one of the tasks failed.  In this case from the debug log - it is not quite
> apparent which task failed and the reason for failure.
>
> When the TSV file itself is small the issue doesn’t exist. In this
> particular case, the file is a 21MB clickstream data from wikipedia
> available at https://ndownloader.figshare.com/files/5036392
>
> Where could i read up more about managed memory leak. Any pointers on what
> might be the issue would be highly helpful
>
> thanks
> appu
>
>
>
>


-- 
---
Takeshi Yamamuro


Re: How to disable write ahead logs?

2016-11-28 Thread Takeshi Yamamuro
Hi,

If you disable tracker-side WAL, you unset a checkpoint dir by using
streamingContext.checkpoint().
http://spark.apache.org/docs/latest/streaming-programming-guide.html#how-to-configure-checkpointing

// maropu


On Tue, Nov 29, 2016 at 9:04 AM, Tim Harsch <thar...@cray.com> wrote:

> Hi all,
>
> I set `spark.streaming.receiver.writeAheadLog.enable=false` and my
> history server confirms the property has been set.  Yet, I continue to see
> the error:
>
>
>
> 16/11/28 15:47:04 ERROR util.FileBasedWriteAheadLog_ReceivedBlockTracker:
> Failed to write to write ahead log after 3 failures
>
>
>
> I surmised from the spark users thread Spark Streaming Data loss on
> failure to write BlockAdditionEvent failure to WAL (
> https://www.mail-archive.com/user@spark.apache.org/msg58952.html) that
> HDFS append support must be enabled for WAL to work with HDFS.  My
> installation does not enable this HDFS feature, so I would like to disable
> WAL in Spark.
>
>
>
> Thanks,
>
> Tim
>
>
>



-- 
---
Takeshi Yamamuro


Re: Why is shuffle write size so large when joining Dataset with nested structure?

2016-11-25 Thread Takeshi Yamamuro
Hi,

I think this is just the overhead to represent nested elements as internal
rows on-runtime
(e.g., it consumes null bits for each nested element).
Moreover, in parquet formats, nested data are columnar and highly
compressed,
so it becomes so compact.

But, I'm not sure about better approaches in this cases.

// maropu








On Sat, Nov 26, 2016 at 11:16 AM, taozhuo <taoz...@gmail.com> wrote:

> The Dataset is defined as case class with many fields with nested
> structure(Map, List of another case class etc.)
> The size of the Dataset is only 1T when saving to disk as Parquet file.
> But when joining it, the shuffle write size becomes as large as 12T.
> Is there a way to cut it down without changing the schema? If not, what is
> the best practice when designing complex schemas?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Why-is-shuffle-write-size-so-large-
> when-joining-Dataset-with-nested-structure-tp28136.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-20 Thread Takeshi Yamamuro
"1 userid data" is ambiguous though (user-input data? stream? shard?),
since a kinesis worker fetch data from shards that the worker has an
ownership of, IIUC user-input data in a shard are transferred into an
assigned worker as long as you get no failure.

// maropu

On Mon, Nov 21, 2016 at 1:59 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Hi
>
> Thanks.
> Have a doubt on spark streaming kinesis consumer. Say I have a batch time
> of 500 ms and kiensis stream is partitioned on userid(uniformly
> distributed).But since IdleTimeBetweenReadsInMillis is set to 1000ms so
> Spark receiver nodes will fetch the data at interval of 1 second and store
> in InputDstream.
>
> 1. When worker executors will fetch the data from receiver at after every
> 500 ms does its gurantee that 1 userid data will go to one partition and
> that to one worker only always ?
> 2.If not - can I repartition stream data before processing? If yes how-
> since JavaDStream has only one method repartition which takes number of
> partitions and not the partitioner function ?So it will randomly
> repartition the Dstream data.
>
> Thanks
>
>
>
>
>
>
>
>
> On Tue, Nov 15, 2016 at 8:23 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Seems it it not a good design to frequently restart workers in a minute
>> because
>> their initialization and shutdown take much time as you said
>> (e.g., interconnection overheads with dynamodb and graceful shutdown).
>>
>> Anyway, since this is a kind of questions about the aws kinesis library,
>> so
>> you'd better to ask aws guys in their forum or something.
>>
>> // maropu
>>
>>
>> On Mon, Nov 14, 2016 at 11:20 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> 1.No, I want to implement low level consumer on kinesis stream.
>>> so need to stop the worker once it read the latest sequence number sent
>>> by driver.
>>>
>>> 2.What is the cost of frequent register and deregister of worker node.
>>> Is that when worker's shutdown is called it will terminate run method but
>>> leasecoordinator will wait for 2seconds before releasing the lease. So I
>>> cannot deregister a worker in less than 2 seconds ?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
>>>> enough for your usecase?
>>>>
>>>> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> Thanks!
>>>>> Is there a way to get the latest sequence number of all shards of a
>>>>> kinesis stream?
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro <
>>>>> linguin@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>>>>>> in KinesisClientLibConfiguration though,
>>>>>> it is not configurable in the current implementation.
>>>>>>
>>>>>> The detail can be found in;
>>>>>> https://github.com/apache/spark/blob/master/external/kinesis
>>>>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>>>>> isReceiver.scala#L152
>>>>>>
>>>>>> // maropu
>>>>>>
>>>>>>
>>>>>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> *Hi *
>>>>>>>
>>>>>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>>>>>> hardcoded to 1 sec or is it configurable ? Time interval at which 
>>>>>>> receiver
>>>>>>> fetched data from kinesis .
>>>>>>>
>>>>>>> Means stream batch interval cannot be less than 
>>>>>>> *spark.streaming.blockInterval
>>>>>>> and this should be configrable , Also is there any minimum value for
>>>>>>> streaming batch interval ?*
>>>>>>>
>>>>>>> *Thanks*
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> ---
>>>>>> Takeshi Yamamuro
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: [SQL/Catalyst] Janino Generated Code Debugging

2016-11-17 Thread Takeshi Yamamuro
Hi,

not sure what you'd like to do tough, is this not enough?
import org.apache.spark.sql.execution.debug._
sql("SELECT 1").debugCodegen()

// maropu


On Thu, Nov 17, 2016 at 3:59 AM, Aleksander Eskilson <
aleksander...@gmail.com> wrote:

> Hi there,
>
> I have some jobs generating Java code (via Janino) that I would like to
> inspect more directly during runtime. The Janino page seems to indicate an
> environmental variable can be set to support debugging the generated code,
> allowing one to step into it directly and inspect variables and set
> breakpoints. I'm using Intellij and setting both
>
> -Dorg.codehaus.janino.source_debugging.enable=true
> -Dorg.codehaus.janino.source_debugging.dir=/Users/username/
> path/to/project/src
>
> but when I begin debug, I can't seem to view the generated code, even if I
> set a breakpoint to the location that calls it and attempt to step into the
> code, or reference a line of the stacktrace that should take me into the
> code. Any idea how to properly set Janino to debug the Catalyst-generated
> code more directly?
>
> Best,
> Alek
>



-- 
---
Takeshi Yamamuro


Re: AVRO File size when caching in-memory

2016-11-16 Thread Takeshi Yamamuro
Hi,

What's the schema interpreted by spark?
A compression logic of the spark caching depends on column types.

// maropu


On Wed, Nov 16, 2016 at 5:26 PM, Prithish <prith...@gmail.com> wrote:

> Thanks for your response.
>
> I did some more tests and I am seeing that when I have a flatter structure
> for my AVRO, the cache memory use is close to the CSV. But, when I use few
> levels of nesting, the cache memory usage blows up. This is really critical
> for planning the cluster we will be using. To avoid using a larger cluster,
> looks like, we will have to consider keeping the structure flat as much as
> possible.
>
> On Wed, Nov 16, 2016 at 1:18 PM, Shreya Agarwal <shrey...@microsoft.com>
> wrote:
>
>> (Adding user@spark back to the discussion)
>>
>>
>>
>> Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of scope
>> for compression. On the other hand avro and parquet are already compressed
>> and just store extra schema info, afaik. Avro and parquet are both going to
>> make your data smaller, parquet through compressed columnar storage, and
>> avro through its binary data format.
>>
>>
>>
>> Next, talking about the 62kb becoming 1224kb. I actually do not see such
>> a massive blow up. The avro you shared is 28kb on my system and becomes
>> 53.7kb when cached in memory deserialized and 52.9kb when cached In memory
>> serialized. Exact same numbers with parquet as well. This is expected
>> behavior, if I am not wrong.
>>
>>
>>
>> In fact, now that I think about it, even larger blow ups might be valid,
>> since your data must have been deserialized from the compressed avro
>> format, making it bigger. The order of magnitude of difference in size
>> would depend on the type of data you have and how well it was compressable.
>>
>>
>>
>> The purpose of these formats is to store data to persistent storage in a
>> way that's faster to read from, not to reduce cache-memory usage.
>>
>>
>>
>> Maybe others here have more info to share.
>>
>>
>>
>> Regards,
>>
>> Shreya
>>
>>
>>
>> Sent from my Windows 10 phone
>>
>>
>>
>> *From: *Prithish <prith...@gmail.com>
>> *Sent: *Tuesday, November 15, 2016 11:04 PM
>> *To: *Shreya Agarwal <shrey...@microsoft.com>
>> *Subject: *Re: AVRO File size when caching in-memory
>>
>>
>> I did another test and noting my observations here. These were done with
>> the same data in avro and csv formats.
>>
>> In AVRO, the file size on disk was 62kb and after caching, the in-memory
>> size is 1224kb
>> In CSV, the file size on disk was 690kb and after caching, the in-memory
>> size is 290kb
>>
>> I'm guessing that the spark caching is not able to compress when the
>> source is avro. Not sure if this is just my immature conclusion. Waiting to
>> hear your observation.
>>
>> On Wed, Nov 16, 2016 at 12:14 PM, Prithish <prith...@gmail.com> wrote:
>>
>>> Thanks for your response.
>>>
>>> I have attached the code (that I ran using the Spark-shell) as well as a
>>> sample avro file. After you run this code, the data is cached in memory and
>>> you can go to the "storage" tab on the Spark-ui (localhost:4040) and see
>>> the size it uses. In this example the size is small, but in my actual
>>> scenario, the source file size is 30GB and the in-memory size comes to
>>> around 800GB. I am trying to understand if this is expected when using avro
>>> or not.
>>>
>>> On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal <shrey...@microsoft.com
>>> > wrote:
>>>
>>>> I haven’t used Avro ever. But if you can send over a quick sample code,
>>>> I can run and see if I repro it and maybe debug.
>>>>
>>>>
>>>>
>>>> *From:* Prithish [mailto:prith...@gmail.com]
>>>> *Sent:* Tuesday, November 15, 2016 8:44 PM
>>>> *To:* Jörn Franke <jornfra...@gmail.com>
>>>> *Cc:* User <user@spark.apache.org>
>>>> *Subject:* Re: AVRO File size when caching in-memory
>>>>
>>>>
>>>>
>>>> Anyone?
>>>>
>>>>
>>>>
>>>> On Tue, Nov 15, 2016 at 10:45 AM, Prithish <prith...@gmail.com> wrote:
>>>>
>>>> I am using 2.0.1 and databricks avro library 3.0.1. I am running this
>>>> on the latest AWS EMR release.
>>>>
>>>>
>>>>
>>>> On Mon, Nov 14, 2016 at 3:06 PM, Jörn Franke <jornfra...@gmail.com>
>>>> wrote:
>>>>
>>>> spark version? Are you using tungsten?
>>>>
>>>>
>>>> > On 14 Nov 2016, at 10:05, Prithish <prith...@gmail.com> wrote:
>>>> >
>>>> > Can someone please explain why this happens?
>>>> >
>>>> > When I read a 600kb AVRO file and cache this in memory (using
>>>> cacheTable), it shows up as 11mb (storage tab in Spark UI). I have tried
>>>> this with different file sizes, and the size in-memory is always
>>>> proportionate. I thought Spark compresses when using cacheTable.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark Streaming: question on sticky session across batches ?

2016-11-15 Thread Takeshi Yamamuro
- dev

Hi,

AFAIK, if you use RDDs only, you can control the partition mapping to some
extent
by using a partition key RDD[(key, data)].
A defined partitioner distributes data into partitions depending on the key.
As a good example to control partitions, you can see the GraphX code;
https://github.com/apache/spark/blob/master/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala#L291

GraphX holds `PartitionId` in edge RDDs to control the partition where edge
data are.

// maropu


On Tue, Nov 15, 2016 at 5:19 AM, Manish Malhotra <
manish.malhotra.w...@gmail.com> wrote:

> sending again.
> any help is appreciated !
>
> thanks in advance.
>
> On Thu, Nov 10, 2016 at 8:42 AM, Manish Malhotra <
> manish.malhotra.w...@gmail.com> wrote:
>
>> Hello Spark Devs/Users,
>>
>> Im trying to solve the use case with Spark Streaming 1.6.2 where for
>> every batch ( say 2 mins) data needs to go to the same reducer node after
>> grouping by key.
>> The underlying storage is Cassandra and not HDFS.
>>
>> This is a map-reduce job, where also trying to use the partitions of the
>> Cassandra table to batch the data for the same partition.
>>
>> The requirement of sticky session/partition across batches is because the
>> operations which we need to do, needs to read data for every key and then
>> merge this with the current batch aggregate values. So, currently when
>> there is no stickyness across batches, we have to read for every key, merge
>> and then write back. and reads are very expensive. So, if we have sticky
>> session, we can avoid read in every batch and have a cache of till last
>> batch aggregates across batches.
>>
>> So, there are few options, can think of:
>>
>> 1. to change the TaskSchedulerImpl, as its using Random to identify the
>> node for mapper/reducer before starting the batch/phase.
>> Not sure if there is a custom scheduler way of achieving it?
>>
>> 2. Can custom RDD can help to find the node for the key-->node.
>> there is a getPreferredLocation() method.
>> But not sure, whether this will be persistent or can vary for some edge
>> cases?
>>
>> Thanks in advance for you help and time !
>>
>> Regards,
>> Manish
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Spark SQL UDF - passing map as a UDF parameter

2016-11-15 Thread Takeshi Yamamuro
Hi,

Literal cannot handle Tuple2.
Anyway, how about this?

val rdd = sc.makeRDD(1 to 3).map(i => (i, 0))
map(rdd.collect.flatMap(x => x._1 :: x._2 :: Nil).map(lit _): _*)

// maropu

On Tue, Nov 15, 2016 at 9:33 AM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I am trying to use following API from Functions to convert a map into
> column so I can pass it to UDF.
>
> map(cols: Column
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html>
> *): Column
> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/Column.html>
>
> "Creates a new map column. The input columns must be grouped as key-value
> pairs, e.g. (key1, value1, key2, value2, ...). The key columns must all
> have the same data type, and can't be null. The value columns must all have
> the same data type."
>
>
> final val idxMap = idxMapRdd.collectAsMap
> val colmap =  map(idxMapA.map(lit _): _*)
>
> But getting following error:
>
> :139: error: type mismatch;
>  found   : Iterable[org.apache.spark.sql.Column]
>  required: Seq[org.apache.spark.sql.Column]
>val colmap =  map(idxMapArr.map(lit _): _*)
>
>
> If I try:
> val colmap =  map(idxMapArr.map(lit _).toSeq: _*)
>
> It says:
>
> java.lang.RuntimeException: Unsupported literal type class scala.Tuple2
> (17.0,MBO)
> at org.apache.spark.sql.catalyst.expressions.Literal$.apply(
> literals.scala:57)
> at org.apache.spark.sql.functions$.lit(functions.scala:101)
> at $anonfun$1.apply(:153)
>
>
>
> What is the correct usage of a `map` api to convert hashmap into column?
>
>
>
>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>




-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Seems it it not a good design to frequently restart workers in a minute
because
their initialization and shutdown take much time as you said
(e.g., interconnection overheads with dynamodb and graceful shutdown).

Anyway, since this is a kind of questions about the aws kinesis library, so
you'd better to ask aws guys in their forum or something.

// maropu


On Mon, Nov 14, 2016 at 11:20 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> 1.No, I want to implement low level consumer on kinesis stream.
> so need to stop the worker once it read the latest sequence number sent by
> driver.
>
> 2.What is the cost of frequent register and deregister of worker node. Is
> that when worker's shutdown is called it will terminate run method but
> leasecoordinator will wait for 2seconds before releasing the lease. So I
> cannot deregister a worker in less than 2 seconds ?
>
> Thanks!
>
>
>
> On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
>> enough for your usecase?
>>
>> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks!
>>> Is there a way to get the latest sequence number of all shards of a
>>> kinesis stream?
>>>
>>>
>>>
>>> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>>>> in KinesisClientLibConfiguration though,
>>>> it is not configurable in the current implementation.
>>>>
>>>> The detail can be found in;
>>>> https://github.com/apache/spark/blob/master/external/kinesis
>>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>>> isReceiver.scala#L152
>>>>
>>>> // maropu
>>>>
>>>>
>>>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> *Hi *
>>>>>
>>>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>>>>> fetched data from kinesis .
>>>>>
>>>>> Means stream batch interval cannot be less than 
>>>>> *spark.streaming.blockInterval
>>>>> and this should be configrable , Also is there any minimum value for
>>>>> streaming batch interval ?*
>>>>>
>>>>> *Thanks*
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not enough
for your usecase?

On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Thanks!
> Is there a way to get the latest sequence number of all shards of a
> kinesis stream?
>
>
>
> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>> in KinesisClientLibConfiguration though,
>> it is not configurable in the current implementation.
>>
>> The detail can be found in;
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis
>> /KinesisReceiver.scala#L152
>>
>> // maropu
>>
>>
>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> *Hi *
>>>
>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
>>> fetched data from kinesis .
>>>
>>> Means stream batch interval cannot be less than 
>>> *spark.streaming.blockInterval
>>> and this should be configrable , Also is there any minimum value for
>>> streaming batch interval ?*
>>>
>>> *Thanks*
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-14 Thread Takeshi Yamamuro
Hi,

The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
in KinesisClientLibConfiguration though,
it is not configurable in the current implementation.

The detail can be found in;
https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala#L152

// maropu


On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> *Hi *
>
> *is **spark.streaming.blockInterval* for kinesis input stream is
> hardcoded to 1 sec or is it configurable ? Time interval at which receiver
> fetched data from kinesis .
>
> Means stream batch interval cannot be less than *spark.streaming.blockInterval
> and this should be configrable , Also is there any minimum value for
> streaming batch interval ?*
>
> *Thanks*
>
>


-- 
---
Takeshi Yamamuro


Re: Convert SparseVector column to Densevector column

2016-11-13 Thread Takeshi Yamamuro
Hi,

How about this?

import org.apache.spark.ml.linalg._
val toSV = udf((v: Vector) => v.toDense)
val df = Seq((0.1, Vectors.sparse(16, Array(0, 3), Array(0.1, 0.3))), (0.2,
Vectors.sparse(16, Array(0, 3), Array(0.1, 0.3.toDF("a", "b")
df.select(toSV($"b"))

// maropu


On Mon, Nov 14, 2016 at 1:20 PM, janardhan shetty <janardhan...@gmail.com>
wrote:

> Hi,
>
> Is there any easy way of converting a dataframe column from SparseVector
> to DenseVector  using
>
> import org.apache.spark.ml.linalg.DenseVector API ?
>
> Spark ML 2.0
>



-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-11-07 Thread Takeshi Yamamuro
I'm not familiar with the kafka implementation though, a kinesis receiver
runs in a thread of executors.
You can set any value in the interval, but frequent checkpoints cause
excess loads in dynamodb.
See:
http://spark.apache.org/docs/latest/streaming-kinesis-integration.html#kinesis-checkpointing

// maropu

On Mon, Nov 7, 2016 at 1:36 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Hi
>
> By receicer I meant spark streaming receiver architecture- means worker
> nodes are different than receiver nodes. There is no direct consumer/low
> level consumer like of  Kafka in kinesis spark streaming?
>
> Is there any limitation on interval checkpoint - minimum of 1second in
> spark streaming with kinesis. But as such there is no limit on checkpoint
> interval in KCL side ?
>
> Thanks
>
> On Tue, Oct 25, 2016 at 8:36 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> I'm not exactly sure about the receiver you pointed though,
>> if you point the "KinesisReceiver" implementation, yes.
>>
>> Also, we currently cannot disable the interval checkpoints.
>>
>> On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks!
>>>
>>> Is kinesis streams are receiver based only? Is there non receiver based
>>> consumer for Kinesis ?
>>>
>>> And Instead of having fixed checkpoint interval,Can I disable auto
>>> checkpoint and say  when my worker has processed the data after last record
>>> of mapPartition now checkpoint the sequence no using some api.
>>>
>>>
>>>
>>> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> The only thing you can do for Kinesis checkpoints is tune the interval
>>>> of them.
>>>> https://github.com/apache/spark/blob/master/external/kinesis
>>>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>>>> isUtils.scala#L68
>>>>
>>>> Whether the dataloss occurs or not depends on the storage level you set;
>>>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue
>>>> processing
>>>> in case of the dataloss because the stream data Spark receives are
>>>> replicated across executors.
>>>> However,  all the executors that have the replicated data crash,
>>>> IIUC the dataloss occurs.
>>>>
>>>> // maropu
>>>>
>>>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>>>
>>>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>>>> spark worker crashes - then spark launched the worker on another node but
>>>>> start consuming from dynamo db's checkpointed sequence number which is
>>>>> ahead of processed sequenece number .
>>>>>
>>>>> is there a way to checkpoint the sequenece numbers ourselves in
>>>>> Kinesis as it is in Kafka low level consumer ?
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-10-24 Thread Takeshi Yamamuro
I'm not exactly sure about the receiver you pointed though,
if you point the "KinesisReceiver" implementation, yes.

Also, we currently cannot disable the interval checkpoints.

On Tue, Oct 25, 2016 at 11:53 AM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Thanks!
>
> Is kinesis streams are receiver based only? Is there non receiver based
> consumer for Kinesis ?
>
> And Instead of having fixed checkpoint interval,Can I disable auto
> checkpoint and say  when my worker has processed the data after last record
> of mapPartition now checkpoint the sequence no using some api.
>
>
>
> On Tue, Oct 25, 2016 at 7:07 AM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> The only thing you can do for Kinesis checkpoints is tune the interval of
>> them.
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/
>> KinesisUtils.scala#L68
>>
>> Whether the dataloss occurs or not depends on the storage level you set;
>> if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
>> in case of the dataloss because the stream data Spark receives are
>> replicated across executors.
>> However,  all the executors that have the replicated data crash,
>> IIUC the dataloss occurs.
>>
>> // maropu
>>
>> On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Does spark streaming consumer for kinesis uses Kinesis Client Library
>>>  and mandates to checkpoint the sequence number of shards in dynamo db.
>>>
>>> Will it lead to dataloss if consumed datarecords are not yet processed
>>> and kinesis checkpointed the consumed sequenece numbers in dynamo db and
>>> spark worker crashes - then spark launched the worker on another node but
>>> start consuming from dynamo db's checkpointed sequence number which is
>>> ahead of processed sequenece number .
>>>
>>> is there a way to checkpoint the sequenece numbers ourselves in Kinesis
>>> as it is in Kafka low level consumer ?
>>>
>>> Thanks
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Get size of intermediate results

2016-10-24 Thread Takeshi Yamamuro
-dev +user

Hi,

You have tried this?
scala> val df = Seq((1, 0), (2, 0), (3, 0), (4, 0)).toDF.cache
scala> df.queryExecution.executedPlan(0).execute().foreach(x => Unit)
scala> df.rdd.toDebugString

res4: String =

(4) MapPartitionsRDD[13] at rdd at :26 []

 |  MapPartitionsRDD[12] at rdd at :26 []

 |  MapPartitionsRDD[11] at rdd at :26 []

 |  LocalTableScan [_1#41, _2#42]

 MapPartitionsRDD[9] at cache at :23 []

 |  CachedPartitions: 4; MemorySize: 1104.0 B; ExternalBlockStoreSize:
0.0 B; DiskSize: 0.0 B

 |  MapPartitionsRDD[8] at cache at :23 []

 |  ParallelCollectionRDD[7] at cache at :23 []

// maropu

On Fri, Oct 21, 2016 at 10:18 AM, Egor Pahomov <pahomov.e...@gmail.com>
wrote:

> I needed the same for debugging and I just added "count" action in debug
> mode for every step I was interested in. It's very time-consuming, but I
> debug not very often.
>
> 2016-10-20 2:17 GMT-07:00 Andreas Hechenberger <inter...@hechenberger.me>:
>
>> Hey awesome Spark-Dev's :)
>>
>> i am new to spark and i read a lot but now i am stuck :( so please be
>> kind, if i ask silly questions.
>>
>> I want to analyze some algorithms and strategies in spark and for one
>> experiment i want to know the size of the intermediate results between
>> iterations/jobs. Some of them are written to disk and some are in the
>> cache, i guess. I am not afraid of looking into the code (i already did)
>> but its complex and have no clue where to start :( It would be nice if
>> someone can point me in the right direction or where i can find more
>> information about the structure of spark core devel :)
>>
>> I already setup the devel environment and i can compile spark. It was
>> really awesome how smoothly the setup was :) Thx for that.
>>
>> Servus
>> Andy
>>
>> ---------
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>>
>
>
> --
>
>
> *Sincerely yoursEgor Pakhomov*
>



-- 
---
Takeshi Yamamuro


Re: spark streaming with kinesis

2016-10-24 Thread Takeshi Yamamuro
Hi,

The only thing you can do for Kinesis checkpoints is tune the interval of
them.
https://github.com/apache/spark/blob/master/external/
kinesis-asl/src/main/scala/org/apache/spark/streaming/
kinesis/KinesisUtils.scala#L68

Whether the dataloss occurs or not depends on the storage level you set;
if you set StorageLevel.MEMORY_AND_DISK_2, Spark may continue processing
in case of the dataloss because the stream data Spark receives are
replicated across executors.
However,  all the executors that have the replicated data crash,
IIUC the dataloss occurs.

// maropu

On Mon, Oct 24, 2016 at 4:43 PM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> Does spark streaming consumer for kinesis uses Kinesis Client Library
>  and mandates to checkpoint the sequence number of shards in dynamo db.
>
> Will it lead to dataloss if consumed datarecords are not yet processed and
> kinesis checkpointed the consumed sequenece numbers in dynamo db and spark
> worker crashes - then spark launched the worker on another node but start
> consuming from dynamo db's checkpointed sequence number which is ahead of
> processed sequenece number .
>
> is there a way to checkpoint the sequenece numbers ourselves in Kinesis as
> it is in Kafka low level consumer ?
>
> Thanks
>
>


-- 
---
Takeshi Yamamuro


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-10-01 Thread Takeshi Yamamuro
I got this info. from a hadoop jira ticket:
https://issues.apache.org/jira/browse/MAPREDUCE-5485

// maropu

On Sat, Oct 1, 2016 at 7:14 PM, Igor Berman <igor.ber...@gmail.com> wrote:

> Takeshi, why are you saying this, how have you checked it's only used from
> 2.7.3?
> We use spark 2.0 which is shipped with hadoop dependency of 2.7.2 and we
> use this setting.
> We've sort of "verified" it's used by configuring log of file output
> commiter
>
> On 30 September 2016 at 03:12, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> FYI: Seems 
>> `sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)`
>> is only available at hadoop-2.7.3+.
>>
>> // maropu
>>
>>
>> On Thu, Sep 29, 2016 at 9:28 PM, joffe.tal <joffe@gmail.com> wrote:
>>
>>> You can use partition explicitly by adding "/=>> value>" to
>>> the end of the path you are writing to and then use overwrite.
>>>
>>> BTW in Spark 2.0 you just need to use:
>>>
>>> sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.al
>>> gorithm.version","2”)
>>> and use s3a://
>>>
>>> and you can work with regular output committer (actually
>>> DirectParquetOutputCommitter is no longer available in Spark 2.0)
>>>
>>> so if you are planning on upgrading this could be another motivation
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-Partit
>>> ionBy-SaveMode-Append-tp26398p27810.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-09-29 Thread Takeshi Yamamuro
Hi,

FYI: Seems 
`sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version","2”)`
is only available at hadoop-2.7.3+.

// maropu


On Thu, Sep 29, 2016 at 9:28 PM, joffe.tal <joffe@gmail.com> wrote:

> You can use partition explicitly by adding "/="
> to
> the end of the path you are writing to and then use overwrite.
>
> BTW in Spark 2.0 you just need to use:
>
> sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.
> algorithm.version","2”)
> and use s3a://
>
> and you can work with regular output committer (actually
> DirectParquetOutputCommitter is no longer available in Spark 2.0)
>
> so if you are planning on upgrading this could be another motivation
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/S3-DirectParquetOutputCommitter-
> PartitionBy-SaveMode-Append-tp26398p27810.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -----
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Broadcast big dataset

2016-09-28 Thread Takeshi Yamamuro
Hi,

# I dropped dev and added user because this is more suitable in
user-mailinglist.

I think you need to describe more about your environments,
e.g. spark version, executor memory, and so on.

// maropu


On Wed, Sep 28, 2016 at 11:03 PM, WangJianfei <
wangjianfe...@otcaix.iscas.ac.cn> wrote:

> Hi Devs
>  In my application, i just broadcast a dataset(about 500M) to  the
> ececutors(100+), I got a java heap error
> Jmartad-7219.hadoop.jd.local:53591 (size: 4.0 MB, free: 3.3 GB)
> 16/09/28 15:56:48 INFO BlockManagerInfo: Added broadcast_9_piece19 in
> memory
> on BJHC-Jmartad-9012.hadoop.jd.local:53197 (size: 4.0 MB, free: 3.3 GB)
> 16/09/28 15:56:49 INFO BlockManagerInfo: Added broadcast_9_piece8 in memory
> on BJHC-Jmartad-84101.hadoop.jd.local:52044 (size: 4.0 MB, free: 3.3 GB)
> 16/09/28 15:56:58 INFO BlockManagerInfo: Removed broadcast_8_piece0 on
> 172.22.176.114:37438 in memory (size: 2.7 KB, free: 3.1 GB)
> 16/09/28 15:56:58 WARN TaskSetManager: Lost task 125.0 in stage 7.0 (TID
> 130, BJHC-Jmartad-9376.hadoop.jd.local): java.lang.OutOfMemoryError: Java
> heap space
> at java.io.ObjectInputStream$HandleTable.grow(
> ObjectInputStream.java:3465)
> at
> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3271)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1350)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1350)
> at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:1915)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1350)
> at java.io.ObjectInputStream.readArray(ObjectInputStream.
> java:1706)
>
> My configuration is 4G memory in driver.  Any advice is appreciated.
> Thank you!
>
>
>
> --
> View this message in context: http://apache-spark-
> developers-list.1001551.n3.nabble.com/Broadcast-big-dataset-tp19127.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Tuning Spark memory

2016-09-23 Thread Takeshi Yamamuro
Hi,

Currently, the memory fraction of shuffle and storage is automatically
tuned by a memory manager.
So, you do not need to care the parameter in most cases.
See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala#L24

// maropu


On Fri, Sep 23, 2016 at 9:06 PM, tan shai <tan.shai...@gmail.com> wrote:

> Hi,
>
> I am working with Spark 2.0, the job starts by sorting the input data and
> storing the output on HDFS.
>
> I am getting Out of memory errors, the solution was to increase the value
> of spark.shuffle.memoryFraction from 0.2 to 0.8 and this solves the
> problem. But in the documentation I have found that this is a deprecated
> parameter.
>
> As I have understand, It was replaced by spark.memory.fraction. How to
> modify this parameter while taking into account the sort and storage on
> HDFS?
>
> Thanks.
>



-- 
---
Takeshi Yamamuro


Re: Spark output data to S3 is very slow

2016-09-16 Thread Takeshi Yamamuro
Hi,

Have you seen the previous thread?
https://www.mail-archive.com/user@spark.apache.org/msg56791.html

// maropu


On Sat, Sep 17, 2016 at 11:34 AM, Qiang Li <q...@appannie.com> wrote:

> Hi,
>
>
> I ran some jobs with Spark 2.0 on Yarn, I found all tasks finished very
> quickly, but the last step, spark spend lots of time to rename or move data
> from s3 temporary directory to real directory, then I try to set
>
> spark.hadoop.spark.sql.parquet.output.committer.
> class=org.apache.spark.sql.execution.datasources.parquet.
> DirectParquetOutputCommitter
> or
> spark.sql.parquet.output.committer.class=org.apache.spark.sql.parquet.
> DirectParquetOutputCommitter
>
> But both doesn't work, looks like spark 2.0 removed these configs, how can
> I let spark output directly without temporary directory ?
>
>
>
> *This email may contain or reference confidential information and is
> intended only for the individual to whom it is addressed.  Please refrain
> from distributing, disclosing or copying this email and the information
> contained within unless you are the intended recipient.  If you received
> this email in error, please notify us at le...@appannie.com
> <le...@appannie.com>** immediately and remove it from your system.*




-- 
---
Takeshi Yamamuro


Re: JDBC Very Slow

2016-09-16 Thread Takeshi Yamamuro
Hi,

It'd be better to set `predicates` in jdbc arguments for loading in
parallel.
See:
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala#L200

// maropu

On Sat, Sep 17, 2016 at 7:46 AM, Benjamin Kim <bbuil...@gmail.com> wrote:

> I am testing this in spark-shell. I am following the Spark documentation
> by simply adding the PostgreSQL driver to the Spark Classpath.
>
> SPARK_CLASSPATH=/path/to/postgresql/driver spark-shell
>
>
> Then, I run the code below to connect to the PostgreSQL database to query.
> This is when I have problems.
>
> Thanks,
> Ben
>
>
> On Sep 16, 2016, at 3:29 PM, Nikolay Zhebet <phpap...@gmail.com> wrote:
>
> Hi! Can you split init code with current comand? I thing it is main
> problem in your code.
> 16 сент. 2016 г. 8:26 PM пользователь "Benjamin Kim" <bbuil...@gmail.com>
> написал:
>
>> Has anyone using Spark 1.6.2 encountered very slow responses from pulling
>> data from PostgreSQL using JDBC? I can get to the table and see the schema,
>> but when I do a show, it takes very long or keeps timing out.
>>
>> The code is simple.
>>
>> val jdbcDF = sqlContext.read.format("jdbc").options(
>> Map("url" -> "jdbc:postgresql://dbserver:po
>> rt/database?user=user=password",
>>"dbtable" -> “schema.table")).load()
>>
>> jdbcDF.show
>>
>>
>> If anyone can help, please let me know.
>>
>> Thanks,
>> Ben
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark SQL Thriftserver

2016-09-13 Thread Takeshi Yamamuro
> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 September 2016 at 23:28, Benjamin Kim <bbuil...@gmail.com> wrote:
>>
>>> Mich,
>>>
>>> It sounds like that there would be no harm in changing then. Are you
>>> saying that using STS would still use MapReduce to run the SQL statements?
>>> What our users are doing in our CDH 5.7.2 installation is changing the
>>> execution engine to Spark when connected to HiveServer2 to get faster
>>> results. Would they still have to do this using STS? Lastly, we are seeing
>>> zombie YARN jobs left behind even after a user disconnects. Are you seeing
>>> this happen with STS? If not, then this would be even better.
>>>
>>> Thanks for your fast reply.
>>>
>>> Cheers,
>>> Ben
>>>
>>> On Sep 13, 2016, at 3:15 PM, Mich Talebzadeh <mich.talebza...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> Spark Thrift server (STS) still uses hive thrift server. If you look at
>>> $SPARK_HOME/sbin/start-thriftserver.sh you will see (mine is Spark 2)
>>>
>>> function usage {
>>>   echo "Usage: ./sbin/start-thriftserver [options] [thrift server
>>> options]"
>>>   pattern="usage"
>>>   *pattern+="\|Spark assembly has been built with Hive"*
>>>   pattern+="\|NOTE: SPARK_PREPEND_CLASSES is set"
>>>   pattern+="\|Spark Command: "
>>>   pattern+="\|==="
>>>   pattern+="\|--help"
>>>
>>>
>>> Indeed when you start STS, you pass hiveconf parameter to it
>>>
>>> ${SPARK_HOME}/sbin/start-thriftserver.sh \
>>> --master  \
>>> --hiveconf hive.server2.thrift.port=10055 \
>>>
>>> and STS bypasses Spark optimiser and uses Hive optimizer and execution
>>> engine. You will see this in hive.log file
>>>
>>> So I don't think it is going to give you much difference. Unless they
>>> have recently changed the design of STS.
>>>
>>> HTH
>>>
>>>
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>> On 13 September 2016 at 22:32, Benjamin Kim <bbuil...@gmail.com> wrote:
>>>
>>>> Does anyone have any thoughts about using Spark SQL Thriftserver in
>>>> Spark 1.6.2 instead of HiveServer2? We are considering abandoning
>>>> HiveServer2 for it. Some advice and gotcha’s would be nice to know.
>>>>
>>>> Thanks,
>>>> Ben
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
---
Takeshi Yamamuro


Re: Debugging a spark application in a none lazy mode

2016-09-12 Thread Takeshi Yamamuro
istm what you can only do is inject `collect` methods map-by-map like;

`df.map(x => do something...).collect`  // check intermediate results in
maps

This only works for small datasets though.

// maropu

On Tue, Sep 13, 2016 at 1:38 AM, Attias, Hagai <hatt...@akamai.com> wrote:

> Hi,
>
> Not sure what you mean, can you give an example?
>
>
>
> Hagai.
>
>
>
> *From: *Takeshi Yamamuro <linguin@gmail.com>
> *Date: *Monday, September 12, 2016 at 7:24 PM
> *To: *Hagai Attias <hatt...@akamai.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: Debugging a spark application in a none lazy mode
>
>
>
> Hi,
>
>
>
> Spark does not have such mode.
>
> How about getting local arrays by `collect` methods for debugging?
>
>
>
> // maropu
>
>
>
> On Tue, Sep 13, 2016 at 12:44 AM, Hagai <hatt...@akamai.com> wrote:
>
> Hi guys,
> Lately I was looking for a way to debug my spark application locally.
>
> However, since all transformations are actually being executed when the
> action is encountered, I have no way to look at the data after each
> transformation. Does spark support a non-lazy mode which enables to execute
> the transformations locally after each statement?
>
> Thanks,
> Hagai.
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Debugging-a-spark-application-
> in-a-none-lazy-mode-tp27695.html
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dspark-2Duser-2Dlist.1001560.n3.nabble.com_Debugging-2Da-2Dspark-2Dapplication-2Din-2Da-2Dnone-2Dlazy-2Dmode-2Dtp27695.html=DQMFaQ=96ZbZZcaMF4w0F4jpN6LZg=2rse0GbSQg7RYcSn-6rlDlqzz-dO7WIUHmH78VamBHk=R2km6aj_sbZAZeikZA4yYw9K04cSrc6PmYXoq-Hvanc=NUDoEmxnN6I8u1TtEZb0xdTvseGduRGZANbgUO47FPE=>
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>
> --
>
> ---
> Takeshi Yamamuro
>



-- 
---
Takeshi Yamamuro


Re: Debugging a spark application in a none lazy mode

2016-09-12 Thread Takeshi Yamamuro
Hi,

Spark does not have such mode.
How about getting local arrays by `collect` methods for debugging?

// maropu

On Tue, Sep 13, 2016 at 12:44 AM, Hagai <hatt...@akamai.com> wrote:

> Hi guys,
> Lately I was looking for a way to debug my spark application locally.
>
> However, since all transformations are actually being executed when the
> action is encountered, I have no way to look at the data after each
> transformation. Does spark support a non-lazy mode which enables to execute
> the transformations locally after each statement?
>
> Thanks,
> Hagai.
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Debugging-a-spark-application-
> in-a-none-lazy-mode-tp27695.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Spark_JDBC_Partitions

2016-09-10 Thread Takeshi Yamamuro
Hi,

Yea, spark does not have the same functionality with sqoop.
I think one of simple solutions is to assign unique ids on the oracle table
by yourself.
Thought?

// maropu


On Sun, Sep 11, 2016 at 12:37 AM, Mich Talebzadeh <mich.talebza...@gmail.com
> wrote:

> Strange that Oracle table of 200Million plus rows has not been partitioned.
>
> What matters here is to have parallel connections from JDBC to Oracle,
> each reading a sub-set of table. Any parallel fetch is going to be better
> than reading with one connection from Oracle.
>
> Surely among 404 columns there must be one with high cardinality to
> satisfy this work.
>
> May be you should just create table  as select * from Oracle_table
> where rownum <= 100; and use that for test.
>
> Other alternative is to use Oracle SQL Connecter for HDFS
> <https://docs.oracle.com/cd/E37231_01/doc.20/e36961/sqlch.htm#BDCUG125>that
> can do it for you. With 404 columns it is difficult to suggest any
> alternative. Is this a FACT table?
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 10 September 2016 at 16:20, Ajay Chander <itsche...@gmail.com> wrote:
>
>> Hello Everyone,
>>
>> My goal is to use Spark Sql to load huge amount of data from Oracle to
>> HDFS.
>>
>> *Table in Oracle:*
>> 1) no primary key.
>> 2) Has 404 columns.
>> 3) Has 200,800,000 rows.
>>
>> *Spark SQL:*
>> In my Spark SQL I want to read the data into n number of partitions in
>> parallel, for which I need to provide 'partition column','lowerBound',
>> 'upperbound', 'numPartitions' from the table Oracle. My table in Oracle has
>> no such column to satisfy this need(Highly Skewed), because of it, if the
>> numPartitions is set to 104, 102 tasks are finished in a minute, 1 task
>> finishes in 20 mins and the last one takes forever.
>>
>> Is there anything I could do to distribute the data evenly into
>> partitions? Can we set any fake query to orchestrate this pull process, as
>> we do in SQOOP like this '--boundary-query "SELECT CAST(0 AS NUMBER) AS
>> MIN_MOD_VAL, CAST(12 AS NUMBER) AS MAX_MOD_VAL FROM DUAL"' ?
>>
>> Any pointers are appreciated.
>>
>> Thanks for your time.
>>
>> ~ Ajay
>>
>
>


-- 
---
Takeshi Yamamuro


Re: java.io.IOException: FAILED_TO_UNCOMPRESS(5)

2016-09-10 Thread Takeshi Yamamuro
Hi,

Seems the known issue, see https://issues.apache.org/jira/browse/SPARK-4105

// maropu

On Sat, Sep 10, 2016 at 11:08 PM, 齐忠 <cente...@gmail.com> wrote:

> Hi all
>
> when use default compression snappy,I get error when spark doing shuffle
>
> 16/09/09 08:33:15 ERROR executor.Executor: Managed memory leak detected;
> size = 89817648 bytes, TID = 20912
> 16/09/09 08:33:15 ERROR executor.Executor: Exception in task 63.2 in stage
> 1.0 (TID 20912)
> java.io.IOException: FAILED_TO_UNCOMPRESS(5)
> at org.xerial.snappy.SnappyNative.throw_error(
> SnappyNative.java:98)
> at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
> at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:465)
> at org.xerial.snappy.Snappy.uncompress(Snappy.java:504)
> at org.xerial.snappy.SnappyInputStream.readFully(
> SnappyInputStream.java:147)
> at org.xerial.snappy.SnappyInputStream.readHeader(
> SnappyInputStream.java:99)
> at org.xerial.snappy.SnappyInputStream.(
> SnappyInputStream.java:59)
> at org.apache.spark.io.SnappyCompressionCodec.
> compressedInputStream(CompressionCodec.scala:159)
> at org.apache.spark.storage.BlockManager.wrapForCompression(
> BlockManager.scala:1186)
> at org.apache.spark.shuffle.BlockStoreShuffleReader$$
> anonfun$2.apply(BlockStoreShuffleReader.scala:53)
> at org.apache.spark.shuffle.BlockStoreShuffleReader$$
> anonfun$2.apply(BlockStoreShuffleReader.scala:52)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.util.CompletionIterator.hasNext(
> CompletionIterator.scala:32)
> at org.apache.spark.InterruptibleIterator.hasNext(
> InterruptibleIterator.scala:39)
> at org.apache.spark.util.collection.ExternalAppendOnlyMap.
> insertAll(ExternalAppendOnlyMap.scala:152)
> at org.apache.spark.Aggregator.combineCombinersByKey(
> Aggregator.scala:58)
> at org.apache.spark.shuffle.BlockStoreShuffleReader.read(
> BlockStoreShuffleReader.scala:83)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:73)
> at org.apache.spark.scheduler.ShuffleMapTask.runTask(
> ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> env info
>
> spark on yarn(cluster)scalaVersion := "2.10.6"
> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % 
> "provided"libraryDependencies += "org.apache.spark" %% "spark-mllib" % 
> "1.6.0" % "provided"
>
>
> ​THANKS​
>
>
> --
> cente...@gmail.com
>



-- 
---
Takeshi Yamamuro


Re: Any estimate for a Spark 2.0.1 release date?

2016-09-06 Thread Takeshi Yamamuro
Oh, sorry. I forgot attaching an URL;
https://www.mail-archive.com/user@spark.apache.org/msg55723.html

// maropu

On Tue, Sep 6, 2016 at 2:41 PM, Morten Hornbech <mor...@datasolvr.com>
wrote:

> Sorry. Seen what? I think you forgot a link.
>
> Morten
>
> Den 6. sep. 2016 kl. 04.51 skrev Takeshi Yamamuro <linguin@gmail.com>:
>
> Hi,
>
> Have you seen this?
>
> // maropu
>
> On Tue, Sep 6, 2016 at 7:42 AM, mhornbech <mor...@datasolvr.com> wrote:
>
>> I can't find any JIRA issues with the tag that are unresolved. Apologies
>> if
>> this is a rookie mistake and the information is available elsewhere.
>>
>> Morten
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Any-estimate-for-a-Spark-2-0-1-release
>> -date-tp27659.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com>.
>>
>> -------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>
>


-- 
---
Takeshi Yamamuro


Re: Any estimate for a Spark 2.0.1 release date?

2016-09-05 Thread Takeshi Yamamuro
Hi,

Have you seen this?

// maropu

On Tue, Sep 6, 2016 at 7:42 AM, mhornbech <mor...@datasolvr.com> wrote:

> I can't find any JIRA issues with the tag that are unresolved. Apologies if
> this is a rookie mistake and the information is available elsewhere.
>
> Morten
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Any-estimate-for-a-Spark-2-0-1-
> release-date-tp27659.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: broadcast fails on join

2016-08-30 Thread Takeshi Yamamuro
Hi,

How about making the value of `spark.sql.broadcastTimeout` bigger?
The value is 300 by default.

// maropu


On Tue, Aug 30, 2016 at 9:09 PM, AssafMendelson <assaf.mendel...@rsa.com>
wrote:

> Hi,
>
> I am seeing a broadcast failure when doing a join as follows:
>
> Assume I have a dataframe df with ~80 million records
>
> I do:
>
> df2 = df.filter(cond) # reduces to ~50 million records
>
> grouped = broadcast(df.groupby(df2.colA).count())
>
> total = df2.join(grouped, df2.colA == grouped.colA, “inner”)
>
> total.filter(total[“count”] > 10).show()
>
>
>
> This fails with an exception:
>
> org.apache.spark.SparkException: Exception thrown in awaitResult:
>
> at org.apache.spark.util.ThreadUtils$.awaitResult(
> ThreadUtils.scala:194)
>
> at org.apache.spark.sql.execution.exchange.
> BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:120)
>
> at org.apache.spark.sql.execution.InputAdapter.
> doExecuteBroadcast(WholeStageCodegenExec.scala:229)
>
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeBroadcast$1.apply(SparkPlan.scala:125)
>
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeBroadcast$1.apply(SparkPlan.scala:125)
>
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
> executeQuery$1.apply(SparkPlan.scala:136)
>
> at org.apache.spark.rdd.RDDOperationScope$.withScope(
> RDDOperationScope.scala:151)
>
> at org.apache.spark.sql.execution.SparkPlan.
> executeQuery(SparkPlan.scala:133)
>
> at org.apache.spark.sql.execution.SparkPlan.
> executeBroadcast(SparkPlan.scala:124)
>
> at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98)
>
> at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.codegenInner(BroadcastHashJoinExec.scala:197)
>
> at org.apache.spark.sql.execution.joins.
> BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:82)
>
> at org.apache.spark.sql.execution.CodegenSupport$
> class.consume(WholeStageCodegenExec.scala:153)
>
> at org.apache.spark.sql.execution.ProjectExec.consume(
> basicPhysicalOperators.scala:30)
>
> at org.apache.spark.sql.execution.ProjectExec.doConsume(
> basicPhysicalOperators.scala:62)
>
> at org.apache.spark.sql.execution.CodegenSupport$
> class.consume(WholeStag eCodegenExec.scala:153)
>
> at org.apache.spark.sql.execution.FilterExec.consume(
> basicPhysicalOperators.scala:79)
>
>
>
> However, if I do:
>
> grouped.cache()
>
> grouped.count()
>
>
>
> before the join everything is fine (btw the grouped dataframe is 1.5MB
> when cached in memory and I have more than 4GB per executor with 8
> executors, the full dataframe is ~8GB)
>
>
>
> Thanks,
>
> Assaf.
>
>
>
> --
> View this message in context: broadcast fails on join
> <http://apache-spark-user-list.1001560.n3.nabble.com/broadcast-fails-on-join-tp27623.html>
> Sent from the Apache Spark User List mailing list archive
> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>



-- 
---
Takeshi Yamamuro


Re: Caching broadcasted DataFrames?

2016-08-25 Thread Takeshi Yamamuro
Hi,

you need to cache df1 to prevent re-computation (including disk reads)
because spark re-broadcasts
data every sql execution.

// maropu

On Fri, Aug 26, 2016 at 2:07 AM, Jestin Ma <jestinwith.a...@gmail.com>
wrote:

> I have a DataFrame d1 that I would like to join with two separate
> DataFrames.
> Since d1 is small enough, I broadcast it.
>
> What I understand about cache vs broadcast is that cache leads to each
> executor storing the partitions its assigned in memory (cluster-wide
> in-memory). Broadcast leads to each node (with multiple executors) storing
> a copy of the dataset (all partitions) inside its own memory.
>
> Since the dataset for d1 is used in two separate joins, should I also
> persist it to prevent reading it from disk again? Or would broadcasting the
> data already take care of that?
>
>
> Thank you,
> Jestin
>



-- 
---
Takeshi Yamamuro


Re: spark 2.0.0 - when saving a model to S3 spark creates temporary files. Why?

2016-08-25 Thread Takeshi Yamamuro
afaik no.

// maropu

On Thu, Aug 25, 2016 at 9:16 PM, Tal Grynbaum <tal.grynb...@gmail.com>
wrote:

> Is/was there an option similar to DirectParquetOutputCommitter to write
> json files to S3 ?
>
> On Thu, Aug 25, 2016 at 2:56 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> Hi,
>>
>> Seems this just prevents writers from leaving partial data in a
>> destination dir when jobs fail.
>> In the previous versions of Spark, there was a way to directly write data
>> in a destination though,
>> Spark v2.0+ has no way to do that because of the critial issue on S3
>> (See: SPARK-10063).
>>
>> // maropu
>>
>>
>> On Thu, Aug 25, 2016 at 2:40 PM, Tal Grynbaum <tal.grynb...@gmail.com>
>> wrote:
>>
>>> I read somewhere that its because s3 has to know the size of the file
>>> upfront
>>> I dont really understand this,  as to why is it ok  not to know it for
>>> the temp files and not ok for the final files.
>>> The delete permission is the minor disadvantage from my side,  the worst
>>> thing is that i have a cluster of 100 machines sitting idle for 15 minutes
>>> waiting for copy to end.
>>>
>>> Any suggestions how to avoid that?
>>>
>>> On Thu, Aug 25, 2016, 08:21 Aseem Bansal <asmbans...@gmail.com> wrote:
>>>
>>>> Hi
>>>>
>>>> When Spark saves anything to S3 it creates temporary files. Why? Asking
>>>> this as this requires the the access credentails to be given
>>>> delete permissions along with write permissions.
>>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> *Tal Grynbaum* / *CTO & co-founder*
>
> m# +972-54-7875797
>
> mobile retention done right
>



-- 
---
Takeshi Yamamuro


Re: spark 2.0.0 - when saving a model to S3 spark creates temporary files. Why?

2016-08-25 Thread Takeshi Yamamuro
Hi,

Seems this just prevents writers from leaving partial data in a destination
dir when jobs fail.
In the previous versions of Spark, there was a way to directly write data
in a destination though,
Spark v2.0+ has no way to do that because of the critial issue on S3 (See:
SPARK-10063).

// maropu


On Thu, Aug 25, 2016 at 2:40 PM, Tal Grynbaum <tal.grynb...@gmail.com>
wrote:

> I read somewhere that its because s3 has to know the size of the file
> upfront
> I dont really understand this,  as to why is it ok  not to know it for the
> temp files and not ok for the final files.
> The delete permission is the minor disadvantage from my side,  the worst
> thing is that i have a cluster of 100 machines sitting idle for 15 minutes
> waiting for copy to end.
>
> Any suggestions how to avoid that?
>
> On Thu, Aug 25, 2016, 08:21 Aseem Bansal <asmbans...@gmail.com> wrote:
>
>> Hi
>>
>> When Spark saves anything to S3 it creates temporary files. Why? Asking
>> this as this requires the the access credentails to be given
>> delete permissions along with write permissions.
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark SQL and number of task

2016-08-04 Thread Takeshi Yamamuro
Seems the performance difference comes from `CassandraSourceRelation`.
I'm not familiar with the implementation though, I guess the filter `IN` is
pushed down
into the datasource and the other not.

You'd better off checking performance metrics in webUI.

// maropu

On Thu, Aug 4, 2016 at 8:41 PM, Marco Colombo <ing.marco.colo...@gmail.com>
wrote:

> Ok, thanx.
> The 2 plan are very similar
>
> with in condition
>
> +--+--+
> |
> plan   |
>
> +--+--+
> | == Physical Plan ==
>  |
> | TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
> output=[id#0L,_c1#81])  |
> | +- TungstenExchange hashpartitioning(id#0L,10), None
> |
> |+- TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
> output=[id#0L,sum#85,count#86L])|
> |   +- Scan
> org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
> PushedFilters: [In(id,[Ljava.lang.Object;@6f8b2e9d)]  |
>
> +--+--+
>
> with the or condition
>
> +--+--+
> |
> plan   |
>
> +--+--+
> | == Physical Plan ==
>  |
> | TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Final,isDistinct=false)],
> output=[id#0L,_c1#88])  |
> | +- TungstenExchange hashpartitioning(id#0L,10), None
> |
> |+- TungstenAggregate(key=[id#0L],
> functions=[(avg(avg#2),mode=Partial,isDistinct=false)],
> output=[id#0L,sum#92,count#93L])|
> |   +- Filter ((id#0L = 94) || (id#0L = 2))
>  |
> |  +- Scan
> org.apache.spark.sql.cassandra.CassandraSourceRelation@49243f65[id#0L,avg#2]
> PushedFilters: [Or(EqualTo(id,94),EqualTo(id,2))]  |
>
> +--+--+
>
>
> Filters are pushed down, so I cannot realize why it is performing a so big
> data extraction in case of or. It's like a full table scan.
>
> Any advice?
>
> Thanks!
>
>
> 2016-08-04 13:25 GMT+02:00 Takeshi Yamamuro <linguin@gmail.com>:
>
>> Hi,
>>
>> Please type `sqlCtx.sql("select *  ").explain` to show execution
>> plans.
>> Also, you can kill jobs from webUI.
>>
>> // maropu
>>
>>
>> On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo <
>> ing.marco.colo...@gmail.com> wrote:
>>
>>> Hi all, I've a question on how hive+spark are handling data.
>>>
>>> I've started a new HiveContext and I'm extracting data from cassandra.
>>> I've configured spark.sql.shuffle.partitions=10.
>>> Now, I've following query:
>>>
>>> select d.id, avg(d.avg) from v_points d where id=90 group by id;
>>>
>>> I see that 10 task are submitted and execution is fast. Every id on that
>>> table has 2000 samples.
>>>
>>> But if I just add a new id, as:
>>>
>>> select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by id;
>>>
>>> it adds 663 task and query does not end.
>>>
>>> If I write query with in () like
>>>
>>> select d.id, avg(d.avg) from v_points d where id in (90,2) group by id;
>>>
>>> query is again fast.
>>>
>>> How can I get the 'execution plan' of the query?
>>>
>>> And also, how can I kill the long running submitted tasks?
>>>
>>> Thanks all!
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Ing. Marco Colombo
>



-- 
---
Takeshi Yamamuro


Re: Spark SQL and number of task

2016-08-04 Thread Takeshi Yamamuro
Hi,

Please type `sqlCtx.sql("select *  ").explain` to show execution plans.
Also, you can kill jobs from webUI.

// maropu


On Thu, Aug 4, 2016 at 4:58 PM, Marco Colombo <ing.marco.colo...@gmail.com>
wrote:

> Hi all, I've a question on how hive+spark are handling data.
>
> I've started a new HiveContext and I'm extracting data from cassandra.
> I've configured spark.sql.shuffle.partitions=10.
> Now, I've following query:
>
> select d.id, avg(d.avg) from v_points d where id=90 group by id;
>
> I see that 10 task are submitted and execution is fast. Every id on that
> table has 2000 samples.
>
> But if I just add a new id, as:
>
> select d.id, avg(d.avg) from v_points d where id=90 or id=2 group by id;
>
> it adds 663 task and query does not end.
>
> If I write query with in () like
>
> select d.id, avg(d.avg) from v_points d where id in (90,2) group by id;
>
> query is again fast.
>
> How can I get the 'execution plan' of the query?
>
> And also, how can I kill the long running submitted tasks?
>
> Thanks all!
>



-- 
---
Takeshi Yamamuro


Re: SparkSession for RDBMS

2016-08-03 Thread Takeshi Yamamuro
Hi,

If these bounaries are not given, spark tries to read all the data as a
single parition.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala#L56

// maropu


On Wed, Aug 3, 2016 at 11:19 PM, Selvam Raman <sel...@gmail.com> wrote:

> Hi All,
>
> I would like to read the data from RDBMS to spark (2.0) using
> sparksession. How can i decide upper boundary, lower boundary and
> partitions.
> is there any specific approach available.
>
> How Sqoop2 decides number of partitions, upper and lower boundary if we
> are not specifying anything.
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
---
Takeshi Yamamuro


Re: Sqoop On Spark

2016-08-01 Thread Takeshi Yamamuro
Hi,

Have you seen this previous thread?
https://www.mail-archive.com/user@spark.apache.org/msg49025.html
I'm not sure this is what you want though.

// maropu


On Tue, Aug 2, 2016 at 1:52 PM, Selvam Raman <sel...@gmail.com> wrote:

>  Hi Team,
>
> how can i use spark as execution engine in sqoop2. i see the patch(S
> QOOP-1532 <https://issues.apache.org/jira/browse/SQOOP-1532>) but it
> shows in progess.
>
> so can not we use sqoop on spark.
>
> Please help me if you have an any idea.
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
---
Takeshi Yamamuro


Re: Possible to push sub-queries down into the DataSource impl?

2016-07-28 Thread Takeshi Yamamuro
Hi,

Have you seen this ticket?
https://issues.apache.org/jira/browse/SPARK-12449

// maropu

On Thu, Jul 28, 2016 at 2:13 AM, Timothy Potter <thelabd...@gmail.com>
wrote:

> I'm not looking for a one-off solution for a specific query that can
> be solved on the client side as you suggest, but rather a generic
> solution that can be implemented within the DataSource impl itself
> when it knows a sub-query can be pushed down into the engine. In other
> words, I'd like to intercept the query planning process to be able to
> push-down computation into the engine when it makes sense.
>
> On Wed, Jul 27, 2016 at 8:04 AM, Marco Colombo
> <ing.marco.colo...@gmail.com> wrote:
> > Why don't you create a dataframe filtered, map it as temporary table and
> > then use it in your query? You can also cache it, of multiple queries on
> the
> > same inner queries are requested.
> >
> >
> > Il mercoledì 27 luglio 2016, Timothy Potter <thelabd...@gmail.com> ha
> > scritto:
> >>
> >> Take this simple join:
> >>
> >> SELECT m.title as title, solr.aggCount as aggCount FROM movies m INNER
> >> JOIN (SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating
> >> >= 4 GROUP BY movie_id ORDER BY aggCount desc LIMIT 10) as solr ON
> >> solr.movie_id = m.movie_id ORDER BY aggCount DESC
> >>
> >> I would like the ability to push the inner sub-query aliased as "solr"
> >> down into the data source engine, in this case Solr as it will
> >> greatlly reduce the amount of data that has to be transferred from
> >> Solr into Spark. I would imagine this issue comes up frequently if the
> >> underlying engine is a JDBC data source as well ...
> >>
> >> Is this possible? Of course, my example is a bit cherry-picked so
> >> determining if a sub-query can be pushed down into the data source
> >> engine is probably not a trivial task, but I'm wondering if Spark has
> >> the hooks to allow me to try ;-)
> >>
> >> Cheers,
> >> Tim
> >>
> >> -----
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
> >
> > --
> > Ing. Marco Colombo
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
---
Takeshi Yamamuro


Re: Setting spark.sql.shuffle.partitions Dynamically

2016-07-27 Thread Takeshi Yamamuro
Hi,

How about trying adaptive execution in spark?
https://issues.apache.org/jira/browse/SPARK-9850
This feature is turned off by default because it seems experimental.

// maropu



On Wed, Jul 27, 2016 at 3:26 PM, Brandon White <bwwintheho...@gmail.com>
wrote:

> Hello,
>
> My platform runs hundreds of Spark jobs every day each with its own
> datasize from 20mb to 20TB. This means that we need to set resources
> dynamically. One major pain point for doing this is
> spark.sql.shuffle.partitions, the number of partitions to use when
> shuffling data for joins or aggregations. It is to be arbitrarily hard
> coded to 200. The only way to set this config is in the spark submit
> command or in the SparkConf before the executor is created.
>
> This creates a lot of problems when I want to set this config dynamically
> based on the in memory size of a dataframe. I only know the in memory size
> of the dataframe halfway through the spark job. So I would need to stop the
> context and recreate it in order to set this config.
>
> Is there any better way to set this? How
> does  spark.sql.shuffle.partitions work differently than .repartition?
>
> Brandon
>



-- 
---
Takeshi Yamamuro


Re: read parquetfile in spark-sql error

2016-07-25 Thread Takeshi Yamamuro
il.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>> at
>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
>> at org.apache.spark.sql.hive.HiveQl$.parseSql(HiveQl.scala:295)
>> at
>> org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
>> at
>> org.apache.spark.sql.hive.HiveQLDialect$$anonfun$parse$1.apply(HiveContext.scala:66)
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:290)
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper.liftedTree1$1(ClientWrapper.scala:237)
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:236)
>> at
>> org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:279)
>> at
>> org.apache.spark.sql.hive.HiveQLDialect.parse(HiveContext.scala:65)
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:211)
>> at
>> org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:114)
>> at
>> org.apache.spark.sql.execution.SparkSQLParser$$anonfun$org$apache$spark$sql$execution$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:113)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
>> at
>> scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>> at
>> scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
>> at
>> scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
>> at
>> org.apache.spark.sql.catalyst.AbstractSparkSQLParser.parse(AbstractSparkSQLParser.scala:34)
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
>> at
>> org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:208)
>> at
>> org.apache.spark.sql.execution.datasources.DDLParser.parse(DDLParser.scala:43)
>> at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231)
>> at
>> org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:331)
>> at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
>> at
>> org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
>> at
>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:311)
>> at
>> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376)
>> at
>> org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311)
>> at
>> org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:409)
>> at
>> org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:425)
>> at
>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:166)
>> at
>> org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>> at
>> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>> at
>> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>> Error in query: cannot recognize input near 'parquetTable' 'USING' 'org'
>> in table name; line 2 pos 0
>>
>>
>> am I use it in the wrong way?
>>
>>
>>
>>
>>
>> thanks
>>
>


-- 
---
Takeshi Yamamuro


Re: Bzip2 to Parquet format

2016-07-25 Thread Takeshi Yamamuro
Hi,

This is the expected behaivour.
A default compression for parquet is `snappy`.
See:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L215

// maropu

On Tue, Jul 26, 2016 at 6:33 AM, janardhan shetty <janardhan...@gmail.com>
wrote:

> Andrew,
>
> 2.0
>
> I tried
> val inputR = sc.textFile(file)
> val inputS = inputR.map(x => x.split("`"))
> val inputDF = inputS.toDF()
>
> inputDF.write.format("parquet").save(result.parquet)
>
> Result part files end with *.snappy.parquet *is that expected ?
>
> On Sun, Jul 24, 2016 at 8:00 PM, Andrew Ehrlich <and...@aehrlich.com>
> wrote:
>
>> You can load the text with sc.textFile() to an RDD[String], then use
>> .map() to convert it into an RDD[Row]. At this point you are ready to
>> apply a schema. Use sqlContext.createDataFrame(rddOfRow, structType)
>>
>> Here is an example on how to define the StructType (schema) that you
>> will combine with the RDD[Row] to create a DataFrame.
>>
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.types.StructType
>>
>> Once you have the DataFrame, save it to parquet with
>> dataframe.save(“/path”) to create a parquet file.
>>
>> Reference for SQLContext / createDataFrame:
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.SQLContext
>>
>>
>>
>> On Jul 24, 2016, at 5:34 PM, janardhan shetty <janardhan...@gmail.com>
>> wrote:
>>
>> We have data in Bz2 compression format. Any links in Spark to convert
>> into Parquet and also performance benchmarks and uses study materials ?
>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Tools for Balancing Partitions by Size

2016-07-12 Thread Takeshi Yamamuro
Hi,

There is no simple way to access the size in a driver side.
Since the partitions of primitive typed data (e.g., int) are compressed by
`DataFrame#cache`,
the actual size is possibly a little bit different from processing
partitions size.

// maropu

On Wed, Jul 13, 2016 at 4:53 AM, Pedro Rodriguez <ski.rodrig...@gmail.com>
wrote:

> Hi,
>
> Are there any tools for partitioning RDD/DataFrames by size at runtime?
> The idea would be to specify that I would like for each partition to be
> roughly X number of megabytes then write that through to S3. I haven't
> found anything off the shelf, and looking through stack overflow posts
> doesn't seem to yield anything concrete.
>
> Is there a way to programmatically get the size or a size estimate for an
> RDD/DataFrame at runtime (eg size of one record would be sufficient)? I
> gave SizeEstimator a try, but it seems like the results varied quite a bit
> (tried on whole RDD and a sample). It would also be useful to get
> programmatic access to the size of the RDD in memory if it is cached.
>
> Thanks,
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>


-- 
---
Takeshi Yamamuro


Re: Spark cluster tuning recommendation

2016-07-12 Thread Takeshi Yamamuro
Hi,

Have you see a slide in spark summit 2016?
https://spark-summit.org/2016/events/top-5-mistakes-when-writing-spark-applications/
This is a good slide for your capacity planning.

// maropu

On Tue, Jul 12, 2016 at 2:31 PM, Yash Sharma <yash...@gmail.com> wrote:

> I would say use the dynamic allocation rather than number of executors.
> Provide some executor memory which you would like.
> Deciding the values requires couple of test runs and checking what works
> best for you.
>
> You could try something like -
>
> --driver-memory 1G \
> --executor-memory 2G \
> --executor-cores 2 \
> --conf spark.dynamicAllocation.enabled=true \
> --conf spark.dynamicAllocation.initialExecutors=8 \
>
>
>
> On Tue, Jul 12, 2016 at 1:27 PM, Anuj Kumar <anujs...@gmail.com> wrote:
>
>> That configuration looks bad. With only two cores in use and 1GB used by
>> the app. Few points-
>>
>> 1. Please oversubscribe those CPUs to at-least twice the amount of cores
>> you have to start-with and then tune if it freezes
>> 2. Allocate all of the CPU cores and memory to your running app (I assume
>> it is your test environment)
>> 3. Assuming that you are running a quad core machine if you define cores
>> as 8 for your workers you will get 56 cores (CPU threads)
>> 4. Also, it depends on the source from where you are reading the data. If
>> you are reading from HDFS, what is your block size and part count?
>> 5. You may also have to tune the timeouts and frame-size based on the
>> dataset and errors that you are facing
>>
>> We have run terasort with couple of high-end worker machines RW from HDFS
>> with 5-10 mount points allocated for HDFS and Spark local. We have used
>> multiple configuration, like-
>> 10W-10CPU-10GB, 25W-6CPU-6GB running on each of the two machines with
>> HDFS 512MB blocks and 1000-2000 parts. All these guys chatting at 10Gbe,
>> worked well.
>>
>> On Tue, Jul 12, 2016 at 3:39 AM, Kartik Mathur <kar...@bluedata.com>
>> wrote:
>>
>>> I am trying a run terasort in spark , for a 7 node cluster with only 10g
>>> of data and executors get lost with GC overhead limit exceeded error.
>>>
>>> This is what my cluster looks like -
>>>
>>>
>>>- *Alive Workers:* 7
>>>- *Cores in use:* 28 Total, 2 Used
>>>- *Memory in use:* 56.0 GB Total, 1024.0 MB Used
>>>- *Applications:* 1 Running, 6 Completed
>>>- *Drivers:* 0 Running, 0 Completed
>>>- *Status:* ALIVE
>>>
>>> Each worker has 8 cores and 4GB memory.
>>>
>>> My questions is how do people running in production decide these
>>> properties -
>>>
>>> 1) --num-executors
>>> 2) --executor-cores
>>> 3) --executor-memory
>>> 4) num of partitions
>>> 5) spark.default.parallelism
>>>
>>> Thanks,
>>> Kartik
>>>
>>>
>>>
>>
>


-- 
---
Takeshi Yamamuro


Re: Spark crashes with two parquet files

2016-07-10 Thread Takeshi Yamamuro
The log explicitly said "java.lang.OutOfMemoryError: Java heap space", so
you need to allocate more JVM memory for spark?

// maropu

On Mon, Jul 11, 2016 at 11:59 AM, Javier Rey <jre...@gmail.com> wrote:

> Also the problem appears when I used clause: unionAll
>
> 2016-07-10 21:58 GMT-05:00 Javier Rey <jre...@gmail.com>:
>
>> This is a part of trace log.
>>
>>  WARN TaskSetManager: Lost task 4.0 in stage 2.0 (TID 13, localhost):
>> java.lang.OutOfMemoryError: Java heap space
>> at
>> org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:755)
>> at
>> org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:494)
>> at
>> org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader.checkEndOfRowGroup(UnsafeRowParquetRecord
>>
>> 2016-07-10 21:47 GMT-05:00 Takeshi Yamamuro <linguin@gmail.com>:
>>
>>> Hi,
>>>
>>> What's the schema in the parquets?
>>> Also, could you show us the stack trace when the error happens?
>>>
>>> // maropu
>>>
>>> On Mon, Jul 11, 2016 at 11:42 AM, Javier Rey <jre...@gmail.com> wrote:
>>>
>>>> Hi everybody,
>>>>
>>>> I installed Spark 1.6.1, I have two parquet files, but when I need show
>>>> registers using unionAll, Spark crash I don't understand what happens.
>>>>
>>>> But when I use show() only one parquet file this is work correctly.
>>>>
>>>> code with fault:
>>>>
>>>> path = '/data/train_parquet/'
>>>> train_df = sqlContext.read.parquet(path)
>>>> train_df.take(1)
>>>>
>>>> code works:
>>>>
>>>> path = '/data/train_parquet/0_0_0.parquet'
>>>> train0_df = sqlContext.read.load(path)
>>>> train_df.take(1)
>>>>
>>>> Thanks in advance.
>>>>
>>>> Samir
>>>>
>>>
>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>


-- 
---
Takeshi Yamamuro


Re: How to run Zeppelin and Spark Thrift Server Together

2016-07-10 Thread Takeshi Yamamuro
Hi,

ISTM multiple sparkcontexts are not recommended in spark.
See: https://issues.apache.org/jira/browse/SPARK-2243

// maropu


On Mon, Jul 11, 2016 at 12:01 PM, ayan guha <guha.a...@gmail.com> wrote:

> Hi
>
> Can you try using JDBC interpreter with STS? We are using Zeppelin+STS on
> YARN for few months now without much issue.
>
> On Mon, Jul 11, 2016 at 12:48 PM, Chanh Le <giaosu...@gmail.com> wrote:
>
>> Hi everybody,
>> We are using Spark to query big data and currently we’re using Zeppelin
>> to provide a UI for technical users.
>> Now we also need to provide a UI for business users so we use Oracle BI
>> tools and set up a Spark Thrift Server (STS) for it.
>>
>> When I run both Zeppelin and STS throw error:
>>
>> INFO [2016-07-11 09:40:21,905] ({pool-2-thread-4}
>> SchedulerFactory.java[jobStarted]:131) - Job
>> remoteInterpretJob_1468204821905 started by scheduler
>> org.apache.zeppelin.spark.SparkInterpreter835015739
>>  INFO [2016-07-11 09:40:21,911] ({pool-2-thread-4}
>> Logging.scala[logInfo]:58) - Changing view acls to: giaosudau
>>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4}
>> Logging.scala[logInfo]:58) - Changing modify acls to: giaosudau
>>  INFO [2016-07-11 09:40:21,912] ({pool-2-thread-4}
>> Logging.scala[logInfo]:58) - SecurityManager: authentication disabled; ui
>> acls disabled; users with view permissions: Set(giaosudau); users with
>> modify permissions: Set(giaosudau)
>>  INFO [2016-07-11 09:40:21,918] ({pool-2-thread-4}
>> Logging.scala[logInfo]:58) - Starting HTTP Server
>>  INFO [2016-07-11 09:40:21,919] ({pool-2-thread-4}
>> Server.java[doStart]:272) - jetty-8.y.z-SNAPSHOT
>>  INFO [2016-07-11 09:40:21,920] ({pool-2-thread-4}
>> AbstractConnector.java[doStart]:338) - Started
>> SocketConnector@0.0.0.0:54818
>>  INFO [2016-07-11 09:40:21,922] ({pool-2-thread-4}
>> Logging.scala[logInfo]:58) - Successfully started service 'HTTP class
>> server' on port 54818.
>>  INFO [2016-07-11 09:40:22,408] ({pool-2-thread-4}
>> SparkInterpreter.java[createSparkContext]:233) - -- Create new
>> SparkContext local[*] ---
>>  WARN [2016-07-11 09:40:22,411] ({pool-2-thread-4}
>> Logging.scala[logWarning]:70) - Another SparkContext is being constructed
>> (or threw an exception in its constructor).  This may indicate an error,
>> since only one SparkContext may be running in this JVM (see SPARK-2243).
>> The other SparkContext was created at:
>>
>> Is that mean I need to setup allow multiple context? Because It’s only
>> test in local with local mode If I deploy on mesos cluster what would
>> happened?
>>
>> Need you guys suggests some solutions for that. Thanks.
>>
>> Chanh
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>



-- 
---
Takeshi Yamamuro


Re: Spark crashes with two parquet files

2016-07-10 Thread Takeshi Yamamuro
Hi,

What's the schema in the parquets?
Also, could you show us the stack trace when the error happens?

// maropu

On Mon, Jul 11, 2016 at 11:42 AM, Javier Rey <jre...@gmail.com> wrote:

> Hi everybody,
>
> I installed Spark 1.6.1, I have two parquet files, but when I need show
> registers using unionAll, Spark crash I don't understand what happens.
>
> But when I use show() only one parquet file this is work correctly.
>
> code with fault:
>
> path = '/data/train_parquet/'
> train_df = sqlContext.read.parquet(path)
> train_df.take(1)
>
> code works:
>
> path = '/data/train_parquet/0_0_0.parquet'
> train0_df = sqlContext.read.load(path)
> train_df.take(1)
>
> Thanks in advance.
>
> Samir
>



-- 
---
Takeshi Yamamuro


Re: IS NOT NULL is not working in programmatic SQL in spark

2016-07-10 Thread Takeshi Yamamuro
Hi,

One of solutions to use `spark-csv` (See:
https://github.com/databricks/spark-csv#features).
To load NULL, you can use `nullValue` there.

// maropu


On Mon, Jul 11, 2016 at 1:14 AM, Radha krishna <grkmc...@gmail.com> wrote:

> I want to apply null comparison to a column in sqlcontext.sql, is there
> any way to achieve this?
> On Jul 10, 2016 8:55 PM, "Radha krishna" <grkmc...@gmail.com> wrote:
>
>> Ok thank you, how to achieve the requirement.
>>
>> On Sun, Jul 10, 2016 at 8:44 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> It doesn't look like you have a NULL field, You have a string-value
>>> field with an empty string.
>>>
>>> On Sun, Jul 10, 2016 at 3:19 PM, Radha krishna <grkmc...@gmail.com>
>>> wrote:
>>> > Hi All,IS NOT NULL is not working in programmatic sql. check below for
>>> input
>>> > output and code.
>>> >
>>> > Input
>>> > 
>>> > 10,IN
>>> > 11,PK
>>> > 12,US
>>> > 13,UK
>>> > 14,US
>>> > 15,IN
>>> > 16,
>>> > 17,AS
>>> > 18,AS
>>> > 19,IR
>>> > 20,As
>>> >
>>> > val cntdat = sc.textFile("/user/poc_hortonworks/radha/gsd/sample.txt");
>>> > case class CNT (id:Int , code : String)
>>> > val cntdf = cntdat.map((f) => { val ff=f.split(",");new
>>> > CNT(ff(0).toInt,ff(1))}).toDF
>>> > cntdf.registerTempTable("cntids");
>>> > sqlContext.sql("SELECT MAX(id),code FROM cntids WHERE code is not null
>>> GROUP
>>> > BY code").show()
>>> >
>>> > Output
>>> > =
>>> > +---++
>>> > |_c0|code|
>>> > +---++
>>> > | 18|  AS|
>>> > | 16|  |
>>> > | 13|  UK|
>>> > | 14|  US|
>>> > | 20|  As|
>>> > | 15|  IN|
>>> > | 19|  IR|
>>> > | 11|  PK|
>>> > +---++
>>> >
>>> > i am expecting the below one any idea, how to apply IS NOT NULL ?
>>> >
>>> > +---++
>>> > |_c0|code|
>>> > +---++
>>> > | 18|  AS|
>>> > | 13|  UK|
>>> > | 14|  US|
>>> > | 20|  As|
>>> > | 15|  IN|
>>> > | 19|  IR|
>>> > | 11|  PK|
>>> > +---++
>>> >
>>> >
>>> >
>>> > Thanks & Regards
>>> >Radha krishna
>>> >
>>> >
>>>
>>
>>
>>
>> --
>>
>>
>>
>>
>>
>>
>>
>>
>> Thanks & Regards
>>Radha krishna
>>
>>
>>


-- 
---
Takeshi Yamamuro


Re: Enforcing shuffle hash join

2016-07-04 Thread Takeshi Yamamuro
What's the query?

On Tue, Jul 5, 2016 at 2:28 PM, Lalitha MV <lalitham...@gmail.com> wrote:

> It picks sort merge join, when spark.sql.autoBroadcastJoinThreshold is
> set to -1, or when the size of the small table is more than spark.sql.
> spark.sql.autoBroadcastJoinThreshold.
>
> On Mon, Jul 4, 2016 at 10:17 PM, Takeshi Yamamuro <linguin@gmail.com>
> wrote:
>
>> The join selection can be described in
>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L92
>> .
>> If you have join keys, you can set -1 at
>> `spark.sql.autoBroadcastJoinThreshold` to disable broadcast joins. Then,
>> hash joins are used in queries.
>>
>> // maropu
>>
>> On Tue, Jul 5, 2016 at 4:23 AM, Lalitha MV <lalitham...@gmail.com> wrote:
>>
>>> Hi maropu,
>>>
>>> Thanks for your reply.
>>>
>>> Would it be possible to write a rule for this, to make it always pick
>>> shuffle hash join, over other join implementations(i.e. sort merge and
>>> broadcast)?
>>>
>>> Is there any documentation demonstrating rule based transformation for
>>> physical plan trees?
>>>
>>> Thanks,
>>> Lalitha
>>>
>>> On Sat, Jul 2, 2016 at 12:58 AM, Takeshi Yamamuro <linguin@gmail.com
>>> > wrote:
>>>
>>>> Hi,
>>>>
>>>> No, spark has no hint for the hash join.
>>>>
>>>> // maropu
>>>>
>>>> On Fri, Jul 1, 2016 at 4:56 PM, Lalitha MV <lalitham...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> In order to force broadcast hash join, we can set
>>>>> the spark.sql.autoBroadcastJoinThreshold config. Is there a way to enforce
>>>>> shuffle hash join in spark sql?
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Lalitha
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> ---
>>>> Takeshi Yamamuro
>>>>
>>>
>>>
>>>
>>> --
>>> Regards,
>>> Lalitha
>>>
>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>
>
> --
> Regards,
> Lalitha
>



-- 
---
Takeshi Yamamuro


  1   2   3   >