[ANNOUNCE] Announcing Apache Spark 4.0.0-preview1

2024-06-03 Thread Wenchen Fan
Hi all,

To enable wide-scale community testing of the upcoming Spark 4.0 release,
the Apache Spark community has posted a preview release of Spark 4.0. This
preview is not a stable release in terms of either API or functionality,
but it is meant to give the community early access to try the code that
will become Spark 4.0. If you would like to test the release, please
download it, and send feedback using either the mailing lists or JIRA.

There are a lot of exciting new features added to Spark 4.0, including ANSI
mode by default, Python data source, polymorphic Python UDTF, string
collation support, new VARIANT data type, streaming state store data
source, structured logging, Java 17 by default, and many more.

We'd like to thank our contributors and users for their contributions and
early feedback to this release. This release would not have been possible
without you.

To download Spark 4.0.0-preview1, head over to the download page:
https://archive.apache.org/dist/spark/spark-4.0.0-preview1 . It's also
available in PyPI, with version name "4.0.0.dev1".

Thanks,

Wenchen


Re: [Feature Request] create *permanent* Spark View from DataFrame via PySpark

2023-06-09 Thread Wenchen Fan
DataFrame view stores the logical plan, while SQL view stores SQL text. I
don't think we can support this feature until we have a reliable way to
materialize logical plans.

On Sun, Jun 4, 2023 at 10:31 PM Mich Talebzadeh 
wrote:

> Try sending it to d...@spark.apache.org (and join that group)
>
> You need to raise a JIRA for this request plus related doc related
>
>
> Example JIRA
>
> https://issues.apache.org/jira/browse/SPARK-42485
>
> and the related *Spark project improvement proposals (SPIP) *to be filled
> in
>
> https://spark.apache.org/improvement-proposals.html
>
>
> HTH
>
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
> London
> United Kingdom
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Sun, 4 Jun 2023 at 12:38, keen  wrote:
>
>> Do Spark **devs** read this mailing list?
>> Is there another/a better way to make feature requests?
>> I tried in the past to write a mail to the dev mailing list but it did
>> not show at all.
>>
>> Cheers
>>
>> keen  schrieb am Do., 1. Juni 2023, 07:11:
>>
>>> Hi all,
>>> currently only *temporary* Spark Views can be created from a DataFrame
>>> (df.createOrReplaceTempView or df.createOrReplaceGlobalTempView).
>>>
>>> When I want a *permanent* Spark View I need to specify it via Spark SQL
>>> (CREATE VIEW AS SELECT ...).
>>>
>>> Sometimes it is easier to specify the desired logic of the View through
>>> Spark/PySpark DataFrame API.
>>> Therefore, I'd like to suggest to implement a new PySpark method that
>>> allows creating a *permanent* Spark View from a DataFrame
>>> (df.createOrReplaceView).
>>>
>>> see also:
>>>
>>> https://community.databricks.com/s/question/0D53f1PANVgCAP/is-there-a-way-to-create-a-nontemporary-spark-view-with-pyspark
>>>
>>> Regards
>>> Martin
>>>
>>


Re: [ANNOUNCE] Apache Spark 3.2.3 released

2022-11-30 Thread Wenchen Fan
Thanks, Chao!

On Wed, Nov 30, 2022 at 1:33 AM Chao Sun  wrote:

> We are happy to announce the availability of Apache Spark 3.2.3!
>
> Spark 3.2.3 is a maintenance release containing stability fixes. This
> release is based on the branch-3.2 maintenance branch of Spark. We strongly
> recommend all 3.2 users to upgrade to this stable release.
>
> To download Spark 3.2.3, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-2-3.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
> Chao
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Difference in behavior for Spark 3.0 vs Spark 3.1 "create database "

2022-01-11 Thread Wenchen Fan
Hopefully, this StackOverflow answer can solve your problem:
https://stackoverflow.com/questions/47523037/how-do-i-configure-pyspark-to-write-to-hdfs-by-default

Spark doesn't control the behavior of qualifying paths. It's decided by
certain configs and/or config files.

On Tue, Jan 11, 2022 at 3:03 AM Pablo Langa Blanco  wrote:

> Hi Pralabh,
>
> If it helps, it is probably related to this change
> https://github.com/apache/spark/pull/28527
>
> Regards
>
> On Mon, Jan 10, 2022 at 10:42 AM Pralabh Kumar 
> wrote:
>
>> Hi Spark Team
>>
>> When creating a database via Spark 3.0 on Hive
>>
>> 1) spark.sql("create database test location '/user/hive'").  It creates
>> the database location on hdfs . As expected
>>
>> 2) When running the same command on 3.1 the database is created on the
>> local file system by default. I have to prefix with hdfs to create db on
>> hdfs.
>>
>> Why is there a difference in the behavior, Can you please point me to the
>> jira which causes this change.
>>
>> Note : spark.sql.warehouse.dir and hive.metastore.warehouse.dir both are
>> having default values(not explicitly set)
>>
>> Regards
>> Pralabh Kumar
>>
>


Re: [ANNOUNCE] Apache Spark 3.2.0

2021-10-19 Thread Wenchen Fan
Yea the file naming is a bit confusing, we can fix it in the next release.
3.2 actually means 3.2 or higher, so not a big deal I think.

Congrats and thanks!

On Wed, Oct 20, 2021 at 3:44 AM Jungtaek Lim 
wrote:

> Thanks to Gengliang for driving this huge release!
>
> On Wed, Oct 20, 2021 at 1:50 AM Dongjoon Hyun 
> wrote:
>
>> Thank you so much, Gengliang and all!
>>
>> Dongjoon.
>>
>> On Tue, Oct 19, 2021 at 8:48 AM Xiao Li  wrote:
>>
>>> Thank you, Gengliang!
>>>
>>> Congrats to our community and all the contributors!
>>>
>>> Xiao
>>>
>>> Henrik Peng  于2021年10月19日周二 上午8:26写道:
>>>
 Congrats and thanks!


 Gengliang Wang 于2021年10月19日 周二下午10:16写道:

> Hi all,
>
> Apache Spark 3.2.0 is the third release of the 3.x line. With
> tremendous contribution from the open-source community, this release
> managed to resolve in excess of 1,700 Jira tickets.
>
> We'd like to thank our contributors and users for their contributions
> and early feedback to this release. This release would not have been
> possible without you.
>
> To download Spark 3.2.0, head over to the download page:
> https://spark.apache.org/downloads.html
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-3-2-0.html
>



Re: About Spark executs sqlscript

2021-05-24 Thread Wenchen Fan
It's not possible to load everything into memory. We should use a big query
connector (should be existing already?) and register table B and C and temp
views in Spark.

On Fri, May 14, 2021 at 8:50 AM bo zhao  wrote:

> Hi Team,
>
> I've followed Spark community for several years. This is my first time for
> asking help. I hope you guys can give some experience.
>
> I want to develop a spark application with processing a sqlscript file.
> The data is on BigQuery.
> For example, the sqlscript is:
>
> delete from tableA;
> insert into tableA select b.columnB1, c.columnC2 from tableB b, tableC c;
>
>
> I can parse this file. In my opinion, After parsing the file, steps should
> follow these below:
>
> #step1: read tableB, tableC into memory(Spark)
> #step2. register views for tableB's dataframe and tableC's dataframe
> #step3. use spark.sql("select b.columnB1, c.columnC2 from tableB b, tableC
> c") to get a new dataframe
> #step4. new dataframe.write().() to tableA using mode of "OVERWRITE"
>
> My question:
> #1 If there are 10 tables or more tables, do I need to read each table
> into memory though Spark bases on memory compution?
> #2 Is there a much easier way to deal with my scenarios, for example, I
> just define the datasource(BigQuery) and just parse sqlscript file, others
> are run by Spark.
>
> Please share your experience or idea.
>


Re: [Spark Catalog API] Support for metadata Backup/Restore

2021-05-11 Thread Wenchen Fan
That's my expectation as well. Spark needs a reliable catalog.
backup/restore is just implementation details about how you make your
catalog reliable, which should be transparent to Spark.

On Sat, May 8, 2021 at 6:54 AM ayan guha  wrote:

> Just a consideration:
>
> Is there a value in backup/restore metadata within spark? I would strongly
> argue if the metadata is valuable enough and persistent enough, why dont
> just use external metastore? It is fairly straightforward process. Also
> regardless you are in cloud or not, database bkp is a routine and
> established pattern in most organizations.
> You can also enhance HA and DR by having replicas across zones and regions
> etc etc
>
> Thoughts?
>
>
>
>
> On Sat, 8 May 2021 at 7:02 am, Tianchen Zhang 
> wrote:
>
>> For now we are thinking about adding two methods in Catalog API, not SQL
>> commands:
>> 1. spark.catalog.backup, which backs up the current catalog.
>> 2. spark.catalog.restore(file), which reads the DFS file and recreates
>> the entities described in that file.
>>
>> Can you please give an example of exposing client APIs to the end users
>> in this approach? The users can only call backup or restore, right?
>>
>> Thanks,
>> Tianchen
>>
>> On Fri, May 7, 2021 at 12:27 PM Wenchen Fan  wrote:
>>
>>> If a catalog implements backup/restore, it can easily expose some client
>>> APIs to the end-users (e.g. REST API), I don't see a strong reason to
>>> expose the APIs to Spark. Do you plan to add new SQL commands in Spark to
>>> backup/restore a catalog?
>>>
>>> On Tue, May 4, 2021 at 2:39 AM Tianchen Zhang 
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Currently the user-facing Catalog API doesn't support backup/restore
>>>> metadata. Our customers are asking for such functionalities. Here is a
>>>> usage example:
>>>> 1. Read all metadata of one Spark cluster
>>>> 2. Save them into a Parquet file on DFS
>>>> 3. Read the Parquet file and restore all metadata in another Spark
>>>> cluster
>>>>
>>>> From the current implementation, Catalog API has the list methods
>>>> (listDatabases, listFunctions, etc.) but they don't return enough
>>>> information in order to restore an entity (for example, listDatabases lose
>>>> "properties" of the database and we need "describe database extended" to
>>>> get them). And it only supports createTable (not any other entity
>>>> creations). The only way we can backup/restore an entity is using Spark 
>>>> SQL.
>>>>
>>>> We want to introduce the backup and restore from an API level. We are
>>>> thinking of doing this simply by adding backup() and restore() in
>>>> CatalogImpl, as ExternalCatalog already includes all the methods we need to
>>>> retrieve and recreate entities. We are wondering if there is any concern or
>>>> drawback of this approach. Please advise.
>>>>
>>>> Thank you in advance,
>>>> Tianchen
>>>>
>>> --
> Best Regards,
> Ayan Guha
>


Re: [Spark Catalog API] Support for metadata Backup/Restore

2021-05-07 Thread Wenchen Fan
If a catalog implements backup/restore, it can easily expose some client
APIs to the end-users (e.g. REST API), I don't see a strong reason to
expose the APIs to Spark. Do you plan to add new SQL commands in Spark to
backup/restore a catalog?

On Tue, May 4, 2021 at 2:39 AM Tianchen Zhang 
wrote:

> Hi all,
>
> Currently the user-facing Catalog API doesn't support backup/restore
> metadata. Our customers are asking for such functionalities. Here is a
> usage example:
> 1. Read all metadata of one Spark cluster
> 2. Save them into a Parquet file on DFS
> 3. Read the Parquet file and restore all metadata in another Spark cluster
>
> From the current implementation, Catalog API has the list methods
> (listDatabases, listFunctions, etc.) but they don't return enough
> information in order to restore an entity (for example, listDatabases lose
> "properties" of the database and we need "describe database extended" to
> get them). And it only supports createTable (not any other entity
> creations). The only way we can backup/restore an entity is using Spark SQL.
>
> We want to introduce the backup and restore from an API level. We are
> thinking of doing this simply by adding backup() and restore() in
> CatalogImpl, as ExternalCatalog already includes all the methods we need to
> retrieve and recreate entities. We are wondering if there is any concern or
> drawback of this approach. Please advise.
>
> Thank you in advance,
> Tianchen
>


Re: Big Broadcast Hash Join with Dynamic Partition Pruning gives wrong results

2021-04-07 Thread Wenchen Fan
Hi Tomas, thanks for reporting this bug!

Is it possible to share your dataset so that other people can reproduce and
debug it?

On Thu, Apr 8, 2021 at 7:52 AM Tomas Bartalos 
wrote:

> when I try to do a Broadcast Hash Join on a bigger table (6Mil rows) I get
> an incorrect result of 0 rows.
>
> val rightDF = spark.read.format("parquet").load("table-a")
> val leftDF =  spark.read.format("parquet").load("table-b")
>   //needed to activate dynamic pruning subquery
>   .where('part_ts === 20210304000L)
>
> // leftDF has 7 Mil rows ~ 120 MB
> val join = broadcast(leftDF).join(rightDF,
>   $"match_part_id" === $"part_id" && $"match_id" === $"id"
> )
> join.count
>
> res1: Long = 0
>
> I think it's connected with Dynamic Partition Pruning of the rightDF,
> which is happening according to the plan:
>
> PartitionFilters: [isnotnull(part_id#477L), 
> dynamicpruningexpression(part_id#477L IN dynamicpruning#534)]
>
> = Subqueries =
>
> Subquery:1 Hosting operator id = 6 Hosting Expression = part_id#477L IN 
> dynamicpruning#534
> ReusedExchange (11)
>
>
> (11) ReusedExchange [Reuses operator id: 5]
> Output [4]: [match_part_id#487L, match_id#488L, UK#489, part_ts#490L]
>
> *Removing the broadcast hint OR shrinking the broadcasted table corrects
> the result*:
>
> val rightDF = spark.read.format("parquet").load("table-a")
> val leftDF =  spark.read.format("parquet").load("table-b")
>   //needed to activate dynamic pruning subquery
>   .where('part_ts === 20210304000L)
>  // shrinks the broadcasted table to 18K rows
>  .where('match_id === 33358792)
>
> // leftDF has 18K rows
> val join = broadcast(leftDF).join(rightDF,
>   $"match_part_id" === $"part_id" && $"match_id" === $"id"
> )
> join.count
>
> res2: Long = 379701
>
> I would expect the broadcast to fail, but would never expect to get
> incorrect results without an exception. What do you think ?
>
>
> BR,
>
> Tomas
>


Re: [ANNOUNCE] Announcing Apache Spark 3.1.1

2021-03-03 Thread Wenchen Fan
Great work and congrats!

On Wed, Mar 3, 2021 at 3:51 PM Kent Yao  wrote:

> Congrats, all!
>
> Bests,
> *Kent Yao *
> @ Data Science Center, Hangzhou Research Institute, NetEase Corp.
> *a spark enthusiast*
> *kyuubi is a unified multi-tenant JDBC
> interface for large-scale data processing and analytics, built on top
> of Apache Spark .*
> *spark-authorizer A Spark
> SQL extension which provides SQL Standard Authorization for **Apache
> Spark .*
> *spark-postgres  A library for
> reading data from and transferring data to Postgres / Greenplum with Spark
> SQL and DataFrames, 10~100x faster.*
> *spark-func-extras A
> library that brings excellent and useful functions from various modern
> database management systems to Apache Spark .*
>
>
>
> On 03/3/2021 15:11,Takeshi Yamamuro
>  wrote:
>
> Great work and Congrats, all!
>
> Bests,
> Takeshi
>
> On Wed, Mar 3, 2021 at 2:18 PM Mridul Muralidharan 
> wrote:
>
>>
>> Thanks Hyukjin and congratulations everyone on the release !
>>
>> Regards,
>> Mridul
>>
>> On Tue, Mar 2, 2021 at 8:54 PM Yuming Wang  wrote:
>>
>>> Great work, Hyukjin!
>>>
>>> On Wed, Mar 3, 2021 at 9:50 AM Hyukjin Kwon  wrote:
>>>
 We are excited to announce Spark 3.1.1 today.

 Apache Spark 3.1.1 is the second release of the 3.x line. This release
 adds
 Python type annotations and Python dependency management support as
 part of Project Zen.
 Other major updates include improved ANSI SQL compliance support,
 history server support
 in structured streaming, the general availability (GA) of Kubernetes
 and node decommissioning
 in Kubernetes and Standalone. In addition, this release continues to
 focus on usability, stability,
 and polish while resolving around 1500 tickets.

 We'd like to thank our contributors and users for their contributions
 and early feedback to
 this release. This release would not have been possible without you.

 To download Spark 3.1.1, head over to the download page:
 http://spark.apache.org/downloads.html

 To view the release notes:
 https://spark.apache.org/releases/spark-release-3-1-1.html


>
> --
> ---
> Takeshi Yamamuro
>
>


Re: [ANNOUNCE] Announcing Apache Spark 3.0.1

2020-09-11 Thread Wenchen Fan
Great work, thanks, Ruifeng!

On Fri, Sep 11, 2020 at 11:09 PM Gengliang Wang <
gengliang.w...@databricks.com> wrote:

> Congrats!
> Thanks for the work, Ruifeng!
>
>
> On Fri, Sep 11, 2020 at 9:51 PM Takeshi Yamamuro 
> wrote:
>
>> Congrats and thanks, Ruifeng!
>>
>>
>> On Fri, Sep 11, 2020 at 9:50 PM Dongjoon Hyun 
>> wrote:
>>
>>> It's great. Thank you, Ruifeng!
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>> On Fri, Sep 11, 2020 at 1:54 AM 郑瑞峰  wrote:
>>>
 Hi all,

 We are happy to announce the availability of Spark 3.0.1!
 Spark 3.0.1 is a maintenance release containing stability fixes. This
 release is based on the branch-3.0 maintenance branch of Spark. We strongly
 recommend all 3.0 users to upgrade to this stable release.

 To download Spark 3.0.1, head over to the download page:
 http://spark.apache.org/downloads.html

 Note that you might need to clear your browser cache or to use
 `Private`/`Incognito` mode according to your browsers.

 To view the release notes:
 https://spark.apache.org/releases/spark-release-3-0-1.html

 We would like to acknowledge all community members for contributing to
 this release. This release would not have been possible without you.


 Thanks,
 Ruifeng Zheng


>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


Re: Error while reading hive tables with tmp/hidden files inside partitions

2020-04-23 Thread Wenchen Fan
Yea, please report the bug on a supported Spark version like 2.4.

On Thu, Apr 23, 2020 at 3:40 PM Dhrubajyoti Hati 
wrote:

> FYI we are using Spark 2.2.0. Should the change be present in this spark
> version? Wanted to check before opening a JIRA ticket?
>
>
>
>
> *Regards,Dhrubajyoti Hati.*
>
>
> On Thu, Apr 23, 2020 at 10:12 AM Wenchen Fan  wrote:
>
>> This looks like a bug that path filter doesn't work for hive table
>> reading. Can you open a JIRA ticket?
>>
>> On Thu, Apr 23, 2020 at 3:15 AM Dhrubajyoti Hati 
>> wrote:
>>
>>> Just wondering if any one could help me out on this.
>>>
>>> Thank you!
>>>
>>>
>>>
>>>
>>> *Regards,Dhrubajyoti Hati.*
>>>
>>>
>>> On Wed, Apr 22, 2020 at 7:15 PM Dhrubajyoti Hati 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is there any way to discard files starting with dot(.) or ending with
>>>> .tmp in the hive partition while reading from Hive table using
>>>> spark.read.table method.
>>>>
>>>> I tried using PathFilters but they didn't work. I am using spark-submit
>>>> and passing my python file(pyspark) containing the source code.
>>>>
>>>> spark.sparkContext._jsc.hadoopConfiguration().set("mapreduce.input.pathFilter.class",
>>>> "com.abc.hadoop.utility.TmpFileFilter")
>>>>
>>>> class TmpFileFilter extends PathFilter {
>>>>   override def accept(path : Path): Boolean = 
>>>> !path.getName.endsWith(".tmp")
>>>> }
>>>>
>>>> Still in the detailed logs I can see .tmp files are getting considered
>>>> in the detailed logs:
>>>> 20/04/22 12:58:44 DEBUG MapRFileSystem: getMapRFileStatus
>>>> maprfs:///a/hour=05/host=abc/FlumeData.1587559137715
>>>> 20/04/22 12:58:44 DEBUG MapRFileSystem: getMapRFileStatus
>>>> maprfs:///a/hour=05/host=abc/FlumeData.1587556815621
>>>> 20/04/22 12:58:44 DEBUG MapRFileSystem: getMapRFileStatus
>>>> maprfs:///a/hour=05/host=abc/.FlumeData.1587560277337.tmp
>>>>
>>>>
>>>> Is there any way to discard the tmp(.tmp) or the hidden files(filename
>>>> starting with dot or underscore) in hive partitions while reading from
>>>> spark?
>>>>
>>>>
>>>>
>>>>
>>>> *Regards,Dhrubajyoti Hati.*
>>>>
>>>


Re: Error while reading hive tables with tmp/hidden files inside partitions

2020-04-22 Thread Wenchen Fan
This looks like a bug that path filter doesn't work for hive table reading.
Can you open a JIRA ticket?

On Thu, Apr 23, 2020 at 3:15 AM Dhrubajyoti Hati 
wrote:

> Just wondering if any one could help me out on this.
>
> Thank you!
>
>
>
>
> *Regards,Dhrubajyoti Hati.*
>
>
> On Wed, Apr 22, 2020 at 7:15 PM Dhrubajyoti Hati 
> wrote:
>
>> Hi,
>>
>> Is there any way to discard files starting with dot(.) or ending with
>> .tmp in the hive partition while reading from Hive table using
>> spark.read.table method.
>>
>> I tried using PathFilters but they didn't work. I am using spark-submit
>> and passing my python file(pyspark) containing the source code.
>>
>> spark.sparkContext._jsc.hadoopConfiguration().set("mapreduce.input.pathFilter.class",
>> "com.abc.hadoop.utility.TmpFileFilter")
>>
>> class TmpFileFilter extends PathFilter {
>>   override def accept(path : Path): Boolean = !path.getName.endsWith(".tmp")
>> }
>>
>> Still in the detailed logs I can see .tmp files are getting considered in
>> the detailed logs:
>> 20/04/22 12:58:44 DEBUG MapRFileSystem: getMapRFileStatus
>> maprfs:///a/hour=05/host=abc/FlumeData.1587559137715
>> 20/04/22 12:58:44 DEBUG MapRFileSystem: getMapRFileStatus
>> maprfs:///a/hour=05/host=abc/FlumeData.1587556815621
>> 20/04/22 12:58:44 DEBUG MapRFileSystem: getMapRFileStatus
>> maprfs:///a/hour=05/host=abc/.FlumeData.1587560277337.tmp
>>
>>
>> Is there any way to discard the tmp(.tmp) or the hidden files(filename
>> starting with dot or underscore) in hive partitions while reading from
>> spark?
>>
>>
>>
>>
>> *Regards,Dhrubajyoti Hati.*
>>
>


Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Wenchen Fan
Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
just include Spark dependency in IntelliJ?

On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:

> I have configured  in IntelliJ as external jars
> spark-3.0.0-preview2-bin-hadoop2.7/jar
>
> not pulling anything from maven.
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> <http://www.backbutton.co.uk>
>
>
> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>
>> Which Spark/Scala version do you use?
>>
>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>> wrote:
>>
>>>
>>> with the following sparksession configuration
>>>
>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> this line works
>>>
>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>
>>> however if change the master url like so, with the ip address then the
>>> following error is produced by the position of .take(5)
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>>
>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>> instance of java.lang.invoke.SerializedLambda to field
>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>
>>> BUT if I  remove take(5) or change the position of take(5) or insert an
>>> extra take(5) as illustrated in code then it works. I don't see why the
>>> position of take(5) should cause such an error or be caused by changing the
>>> master url
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>>> 5))
>>>flights.show(5)
>>>
>>>
>>> complete code if you wish to replicate it.
>>>
>>> import org.apache.spark.sql.SparkSession
>>>
>>> object sessiontest {
>>>
>>>   // define specific  data type class then manipulate it using the filter 
>>> and map functions
>>>   // this is also known as an Encoder
>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>  ORIGIN_COUNTRY_NAME:String,
>>>  count: BigInt)
>>>
>>>
>>>   def main(args:Array[String]): Unit ={
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> import spark.implicits._
>>> val flightDf = 
>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>> val flights = flightDf.as[flight]
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>flights.show(5)
>>>
>>>   } // main
>>> }
>>>
>>>
>>>
>>>
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> <http://www.backbutton.co.uk>
>>>
>>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Wenchen Fan
Which Spark/Scala version do you use?

On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
> take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert an
> extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
> 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter and 
> map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


Re: [ANNOUNCE] Announcing Apache Spark 2.4.5

2020-02-10 Thread Wenchen Fan
Great Job, Dongjoon!

On Mon, Feb 10, 2020 at 4:18 PM Hyukjin Kwon  wrote:

> Thanks Dongjoon!
>
> 2020년 2월 9일 (일) 오전 10:49, Takeshi Yamamuro 님이 작성:
>
>> Happy to hear the release news!
>>
>> Bests,
>> Takeshi
>>
>> On Sun, Feb 9, 2020 at 10:28 AM Dongjoon Hyun 
>> wrote:
>>
>>> There was a typo in one URL. The correct release note URL is here.
>>>
>>> https://spark.apache.org/releases/spark-release-2-4-5.html
>>>
>>>
>>>
>>> On Sat, Feb 8, 2020 at 5:22 PM Dongjoon Hyun 
>>> wrote:
>>>
 We are happy to announce the availability of Spark 2.4.5!

 Spark 2.4.5 is a maintenance release containing stability fixes. This
 release is based on the branch-2.4 maintenance branch of Spark. We
 strongly
 recommend all 2.4 users to upgrade to this stable release.

 To download Spark 2.4.5, head over to the download page:
 http://spark.apache.org/downloads.html

 Note that you might need to clear your browser cache or
 to use `Private`/`Incognito` mode according to your browsers.

 To view the release notes:
 https://spark.apache.org/releases/spark-release-2.4.5.html

 We would like to acknowledge all community members for contributing to
 this
 release. This release would not have been possible without you.

 Dongjoon Hyun

>>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


Re: A question about radd bytes size

2019-12-01 Thread Wenchen Fan
When we talk about bytes size, we need to specify how the data is stored.
For example, if we cache the dataframe, then the bytes size is the number
of bytes of the binary format of the table cache. If we write to hive
tables, then the bytes size is the total size of the data files of the
table.

On Mon, Dec 2, 2019 at 1:06 PM zhangliyun  wrote:

> Hi:
>
>  I want to get the total bytes of a DataFrame by following function , but
> when I insert the DataFrame into hive , I found the value of the function
> is different from spark.sql.statistics.totalSize .  The
> spark.sql.statistics.totalSize  is less than the result of following
> function getRDDBytes .
>
>def getRDDBytes(df:DataFrame):Long={
>
>
>   df.rdd.getNumPartitions match {
> case 0 =>
>   0
> case numPartitions =>
>   val rddOfDataframe = 
> df.rdd.map(_.toString().getBytes("UTF-8").length.toLong)
>   val size = if (rddOfDataframe.isEmpty()) {
> 0
>   } else {
> rddOfDataframe.reduce(_ + _)
>   }
>
>   size
>   }
> }
> Appreciate if you can provide your suggestion.
>
> Best Regards
> Kelly Zhang
>
>
>
>
>


Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-06 Thread Wenchen Fan
Sounds reasonable to me. We should make the behavior consistent within
Spark.

On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:

> Currently, when a PySpark Row is created with keyword arguments, the
> fields are sorted alphabetically. This has created a lot of confusion with
> users because it is not obvious (although it is stated in the pydocs) that
> they will be sorted alphabetically. Then later when applying a schema and
> the field order does not match, an error will occur. Here is a list of some
> of the JIRAs that I have been tracking all related to this issue:
> SPARK-24915, SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion
> of the issue [1].
>
> The original reason for sorting fields is because kwargs in python < 3.6
> are not guaranteed to be in the same order that they were entered [2].
> Sorting alphabetically ensures a consistent order. Matters are further
> complicated with the flag _*from_dict*_ that allows the Row fields to to
> be referenced by name when made by kwargs, but this flag is not serialized
> with the Row and leads to inconsistent behavior. For instance:
>
> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A string").first()
> Row(B='2', A='1')>>> 
> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1", B="2")]), "B 
> string, A string").first()
> Row(B='1', A='2')
>
> I think the best way to fix this is to remove the sorting of fields when
> constructing a Row. For users with Python 3.6+, nothing would change
> because these versions of Python ensure that the kwargs stays in the
> ordered entered. For users with Python < 3.6, using kwargs would check a
> conf to either raise an error or fallback to a LegacyRow that sorts the
> fields as before. With Python < 3.6 being deprecated now, this LegacyRow
> can also be removed at the same time. There are also other ways to create
> Rows that will not be affected. I have opened a JIRA [3] to capture this,
> but I am wondering what others think about fixing this for Spark 3.0?
>
> [1] https://github.com/apache/spark/pull/20280
> [2] https://www.python.org/dev/peps/pep-0468/
> [3] https://issues.apache.org/jira/browse/SPARK-29748
>
>


Re: Re: A question about broadcast nest loop join

2019-10-23 Thread Wenchen Fan
Ah sorry I made a mistake. "Spark can only pick BroadcastNestedLoopJoin to
implement left/right join" this should be "left/right non-equal join"

On Thu, Oct 24, 2019 at 6:32 AM zhangliyun  wrote:

>
> Hi Herman:
>I guess what you mentioned before
> ```
> if you are OK with slightly different NULL semantics then you could use NOT
> EXISTS(subquery). The latter should perform a lot better.
>
> ```
> is the NULL key1 of  left table will be retained if NULL key2 is not found
> in the right table  ( join condition :  left.key1 = right.key2)  in exists
> semantics while this will not happen in
> "in semantics". If my understanding wrong, tell me.
>
>
>
> Best Regards.
>
> Kelly Zhang
>
>
>
>
>
>
>
>
> 在 2019-10-23 19:16:34,"Herman van Hovell"  写道:
>
> In some cases BroadcastNestedLoopJoin is the only viable join method. In
> your example for instance you are using a non-equi join condition and BNLJ
> is the only method that works in that case. This is also the reason why you
> can't disable it using the spark.sql.autoBroadcastJoinThreshold
> configuration.
>
> Such a plan is generally generated by using a NOT IN (subquery), if you
> are OK with slightly different NULL semantics then you could use NOT
> EXISTS(subquery). The latter should perform a lot better.
>
> On Wed, Oct 23, 2019 at 12:02 PM zhangliyun  wrote:
>
>> Hi all:
>> i want to ask a question about broadcast nestloop join? from google i
>> know, that
>>  left outer/semi join and right outer/semi join will use broadcast
>> nestloop.
>>   and in some cases, when the input data is very small, it is suitable to
>> use. so here
>>   how to define the input data very small? what parameter decides the
>> threshold?  I just want to disable it ( i found that   set
>> spark.sql.autoBroadcastJoinThreshold= -1 is no work for sql:select a.key1
>>  from testdata1 as a where a.key1 not in (select key3 from testdata3) )
>>
>>
>> ```
>>
>> explain cost select a.key1  from testdata1 as a where a.key1 not in
>> (select key3 from testdata3);
>>
>> == Physical Plan ==
>> *(1) Project [key1#90]
>> +- BroadcastNestedLoopJoin BuildRight, LeftAnti, ((key1#90 = key3#92) ||
>> isnull((key1#90 = key3#92)))
>>:- HiveTableScan [key1#90], HiveTableRelation `default`.`testdata1`,
>> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key1#90, value1#91]
>>+- BroadcastExchange IdentityBroadcastMode
>>   +- HiveTableScan [key3#92], HiveTableRelation
>> `default`.`testdata3`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
>> [key3#92, value3#93]
>>
>> ```
>>
>>   my question is
>>   1. why in not in subquery , BroadcastNestedLoopJoin is still used even
>> i set spark.sql.autoBroadcastJoinThreshold= -1
>>   2. which spark parameter  decides enable/disable
>> BroadcastNestedLoopJoin.
>>
>>
>>
>> Appreciate if you have suggestion
>>
>>
>> Best Regards
>>
>> Kelly Zhang
>>
>>
>>
>>
>
>
>
>


Re: A question about broadcast nest loop join

2019-10-23 Thread Wenchen Fan
I haven't looked into your query yet, just want to let you know that: Spark
can only pick BroadcastNestedLoopJoin to implement left/right join. If the
table is very big, then OOM happens.

Maybe there is an algorithm to implement left/right join in a distributed
environment without broadcast, but currently Spark is only able to deal
with it using broadcast.

On Wed, Oct 23, 2019 at 6:02 PM zhangliyun  wrote:

> Hi all:
> i want to ask a question about broadcast nestloop join? from google i
> know, that
>  left outer/semi join and right outer/semi join will use broadcast
> nestloop.
>   and in some cases, when the input data is very small, it is suitable to
> use. so here
>   how to define the input data very small? what parameter decides the
> threshold?  I just want to disable it ( i found that   set
> spark.sql.autoBroadcastJoinThreshold= -1 is no work for sql:select a.key1
>  from testdata1 as a where a.key1 not in (select key3 from testdata3) )
>
>
> ```
>
> explain cost select a.key1  from testdata1 as a where a.key1 not in
> (select key3 from testdata3);
>
> == Physical Plan ==
> *(1) Project [key1#90]
> +- BroadcastNestedLoopJoin BuildRight, LeftAnti, ((key1#90 = key3#92) ||
> isnull((key1#90 = key3#92)))
>:- HiveTableScan [key1#90], HiveTableRelation `default`.`testdata1`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key1#90, value1#91]
>+- BroadcastExchange IdentityBroadcastMode
>   +- HiveTableScan [key3#92], HiveTableRelation `default`.`testdata3`,
> org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key3#92, value3#93]
>
> ```
>
>   my question is
>   1. why in not in subquery , BroadcastNestedLoopJoin is still used even i
> set spark.sql.autoBroadcastJoinThreshold= -1
>   2. which spark parameter  decides enable/disable BroadcastNestedLoopJoin.
>
>
>
> Appreciate if you have suggestion
>
>
> Best Regards
>
> Kelly Zhang
>
>
>
>


Re: [ANNOUNCE] Announcing Apache Spark 2.4.4

2019-09-01 Thread Wenchen Fan
Great! Thanks!

On Mon, Sep 2, 2019 at 5:55 AM Dongjoon Hyun 
wrote:

> We are happy to announce the availability of Spark 2.4.4!
>
> Spark 2.4.4 is a maintenance release containing stability fixes. This
> release is based on the branch-2.4 maintenance branch of Spark. We strongly
> recommend all 2.4 users to upgrade to this stable release.
>
> To download Spark 2.4.4, head over to the download page:
> http://spark.apache.org/downloads.html
>
> Note that you might need to clear your browser cache or
> to use `Private`/`Incognito` mode according to your browsers.
>
> To view the release notes:
> https://spark.apache.org/releases/spark-release-2-4-4.html
>
> We would like to acknowledge all community members for contributing to this
> release. This release would not have been possible without you.
>
> Dongjoon Hyun
>


Re: JDK11 Support in Apache Spark

2019-08-25 Thread Wenchen Fan
Great work!

On Sun, Aug 25, 2019 at 6:03 AM Xiao Li  wrote:

> Thank you for your contributions! This is a great feature for Spark
> 3.0! We finally achieve it!
>
> Xiao
>
> On Sat, Aug 24, 2019 at 12:18 PM Felix Cheung 
> wrote:
>
>> That’s great!
>>
>> --
>> *From:* ☼ R Nair 
>> *Sent:* Saturday, August 24, 2019 10:57:31 AM
>> *To:* Dongjoon Hyun 
>> *Cc:* d...@spark.apache.org ; user @spark/'user
>> @spark'/spark users/user@spark 
>> *Subject:* Re: JDK11 Support in Apache Spark
>>
>> Finally!!! Congrats
>>
>> On Sat, Aug 24, 2019, 11:11 AM Dongjoon Hyun 
>> wrote:
>>
>>> Hi, All.
>>>
>>> Thanks to your many many contributions,
>>> Apache Spark master branch starts to pass on JDK11 as of today.
>>> (with `hadoop-3.2` profile: Apache Hadoop 3.2 and Hive 2.3.6)
>>>
>>>
>>> https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-3.2-jdk-11/326/
>>> (JDK11 is used for building and testing.)
>>>
>>> We already verified all UTs (including PySpark/SparkR) before.
>>>
>>> Please feel free to use JDK11 in order to build/test/run `master` branch
>>> and
>>> share your experience including any issues. It will help Apache Spark
>>> 3.0.0 release.
>>>
>>> For the follow-ups, please follow
>>> https://issues.apache.org/jira/browse/SPARK-24417 .
>>> The next step is `how to support JDK8/JDK11 together in a single
>>> artifact`.
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>
>
> --
> [image: Databricks Summit - Watch the talks]
> 
>


Re: Release Apache Spark 2.4.4

2019-08-13 Thread Wenchen Fan
+1

On Wed, Aug 14, 2019 at 12:52 PM Holden Karau  wrote:

> +1
> Does anyone have any critical fixes they’d like to see in 2.4.4?
>
> On Tue, Aug 13, 2019 at 5:22 PM Sean Owen  wrote:
>
>> Seems fine to me if there are enough valuable fixes to justify another
>> release. If there are any other important fixes imminent, it's fine to
>> wait for those.
>>
>>
>> On Tue, Aug 13, 2019 at 6:16 PM Dongjoon Hyun 
>> wrote:
>> >
>> > Hi, All.
>> >
>> > Spark 2.4.3 was released three months ago (8th May).
>> > As of today (13th August), there are 112 commits (75 JIRAs) in
>> `branch-24` since 2.4.3.
>> >
>> > It would be great if we can have Spark 2.4.4.
>> > Shall we start `2.4.4 RC1` next Monday (19th August)?
>> >
>> > Last time, there was a request for K8s issue and now I'm waiting for
>> SPARK-27900.
>> > Please let me know if there is another issue.
>> >
>> > Thanks,
>> > Dongjoon.
>>
>> -
>> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>>
>> --
> Twitter: https://twitter.com/holdenkarau
> Books (Learning Spark, High Performance Spark, etc.):
> https://amzn.to/2MaRAG9  
> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>


Re: Access to live data of cached dataFrame

2019-05-21 Thread Wenchen Fan
When you cache a dataframe, you actually cache a logical plan. That's why
re-creating the dataframe doesn't work: Spark finds out the logical plan is
cached and picks the cached data.

You need to uncache the dataframe, or go back to the SQL way:
df.createTempView("abc")
spark.table("abc").cache()
df.show // returns latest data.
spark.table("abc").show // returns cached data.


On Mon, May 20, 2019 at 3:33 AM Tomas Bartalos 
wrote:

> I'm trying to re-read however I'm getting cached data (which is a bit
> confusing). For re-read I'm issuing:
> spark.read.format("delta").load("/data").groupBy(col("event_hour")).count
>
> The cache seems to be global influencing also new dataframes.
>
> So the question is how should I re-read without loosing the cached data
> (without using unpersist) ?
>
> As I mentioned with sql its possible - I can create a cached view, so wen
> I access the original table I get live data, when I access the view I get
> cached data.
>
> BR,
> Tomas
>
> On Fri, 17 May 2019, 8:57 pm Sean Owen,  wrote:
>
>> A cached DataFrame isn't supposed to change, by definition.
>> You can re-read each time or consider setting up a streaming source on
>> the table which provides a result that updates as new data comes in.
>>
>> On Fri, May 17, 2019 at 1:44 PM Tomas Bartalos 
>> wrote:
>> >
>> > Hello,
>> >
>> > I have a cached dataframe:
>> >
>> >
>> spark.read.format("delta").load("/data").groupBy(col("event_hour")).count.cache
>> >
>> > I would like to access the "live" data for this data frame without
>> deleting the cache (using unpersist()). Whatever I do I always get the
>> cached data on subsequent queries. Even adding new column to the query
>> doesn't help:
>> >
>> >
>> spark.read.format("delta").load("/data").groupBy(col("event_hour")).count.withColumn("dummy",
>> lit("dummy"))
>> >
>> >
>> > I'm able to workaround this using cached sql view, but I couldn't find
>> a pure dataFrame solution.
>> >
>> > Thank you,
>> > Tomas
>>
>


Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Wenchen Fan
Did you re-create your df when you update the timezone conf?

On Wed, Apr 24, 2019 at 9:18 PM Shubham Chaurasia 
wrote:

> Writing:
> scala> df.write.orc("")
>
> For looking into contents, I used orc-tools-X.Y.Z-uber.jar (
> https://orc.apache.org/docs/java-tools.html)
>
> On Wed, Apr 24, 2019 at 6:24 PM Wenchen Fan  wrote:
>
>> How did you read/write the timestamp value from/to ORC file?
>>
>> On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia <
>> shubh.chaura...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Consider the following(spark v2.4.0):
>>>
>>> Basically I change values of `spark.sql.session.timeZone` and perform an
>>> orc write. Here are 3 samples:-
>>>
>>> 1)
>>> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
>>>
>>> scala> val df = sc.parallelize(Seq("2019-04-23
>>> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
>>> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
>>>
>>> df.show() Output  ORC File Contents
>>> -
>>> 2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}
>>>
>>> 2)
>>> scala> spark.conf.set("spark.sql.session.timeZone", "UTC")
>>>
>>> df.show() Output  ORC File Contents
>>> -
>>> 2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>>
>>> 3)
>>> scala> spark.conf.set("spark.sql.session.timeZone",
>>> "America/Los_Angeles")
>>>
>>> df.show() Output  ORC File Contents
>>> -
>>> 2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}
>>>
>>> It can be seen that in all the three cases it stores {"ts":"2019-04-23
>>> 09:15:04.0"} in orc file. I understand that orc file also contains writer
>>> timezone with respect to which spark is able to convert back to actual time
>>> when it reads orc.(and that is equal to df.show())
>>>
>>> But it's problematic in the sense that it is not adjusting(plus/minus)
>>> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
>>> 09:15:04.0"} in ORC file. I mean loading data to any system other than
>>> spark would be a problem.
>>>
>>> Any ideas/suggestions on that?
>>>
>>> PS: For csv files, it stores exactly what we see as the output of
>>> df.show()
>>>
>>> Thanks,
>>> Shubham
>>>
>>>


Re: DataFrameWriter does not adjust spark.sql.session.timeZone offset while writing orc files

2019-04-24 Thread Wenchen Fan
How did you read/write the timestamp value from/to ORC file?

On Wed, Apr 24, 2019 at 6:30 PM Shubham Chaurasia 
wrote:

> Hi All,
>
> Consider the following(spark v2.4.0):
>
> Basically I change values of `spark.sql.session.timeZone` and perform an
> orc write. Here are 3 samples:-
>
> 1)
> scala> spark.conf.set("spark.sql.session.timeZone", "Asia/Kolkata")
>
> scala> val df = sc.parallelize(Seq("2019-04-23
> 09:15:04.0")).toDF("ts").withColumn("ts", col("ts").cast("timestamp"))
> df: org.apache.spark.sql.DataFrame = [ts: timestamp]
>
> df.show() Output  ORC File Contents
> -
> 2019-04-23 09:15:04   {"ts":"2019-04-23 09:15:04.0"}
>
> 2)
> scala> spark.conf.set("spark.sql.session.timeZone", "UTC")
>
> df.show() Output  ORC File Contents
> -
> 2019-04-23 03:45:04   {"ts":"2019-04-23 09:15:04.0"}
>
> 3)
> scala> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
>
> df.show() Output  ORC File Contents
> -
> 2019-04-22 20:45:04   {"ts":"2019-04-23 09:15:04.0"}
>
> It can be seen that in all the three cases it stores {"ts":"2019-04-23
> 09:15:04.0"} in orc file. I understand that orc file also contains writer
> timezone with respect to which spark is able to convert back to actual time
> when it reads orc.(and that is equal to df.show())
>
> But it's problematic in the sense that it is not adjusting(plus/minus)
> timezone (spark.sql.session.timeZone) offsets for {"ts":"2019-04-23
> 09:15:04.0"} in ORC file. I mean loading data to any system other than
> spark would be a problem.
>
> Any ideas/suggestions on that?
>
> PS: For csv files, it stores exactly what we see as the output of df.show()
>
> Thanks,
> Shubham
>
>


Re: Manually reading parquet files.

2019-03-22 Thread Wenchen Fan
Try `val enconder = RowEncoder(df.schema).resolveAndBind()` ?

On Thu, Mar 21, 2019 at 5:39 PM Long, Andrew 
wrote:

> Thanks a ton for the help!
>
>
>
> Is there a standardized way of converting the internal row to rows?
>
>
>
> I’ve tried this but im getting an exception
>
>
>
> *val *enconder = *RowEncoder*(df.schema)
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> })
>   .map(enconder.fromRow(_))
>   .toList
>
>
>
> java.lang.RuntimeException: Error while decoding:
> java.lang.UnsupportedOperationException: Cannot evaluate expression:
> getcolumnbyordinal(0, IntegerType)
>
> createexternalrow(getcolumnbyordinal(0, IntegerType),
> getcolumnbyordinal(1, IntegerType), getcolumnbyordinal(2,
> StringType).toString, StructField(pk,IntegerType,false),
> StructField(ordering,IntegerType,false), StructField(col_a,StringType,true))
>
>
>
> at
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
> at
> com.amazon.horizon.azulene.ParquetReadTests$$anonfun$2.apply(ParquetReadTests.scala:100)
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Thursday, March 21, 2019 at 3:32 PM
> *To: *"Long, Andrew" 
> *Cc: *"d...@spark.apache.org" , "
> user@spark.apache.org" , "horizon-...@amazon.com" <
> horizon-...@amazon.com>
> *Subject: *Re: Manually reading parquet files.
>
>
>
> You're getting InternalRow instances. They probably have the data you
> want, but the toString representation doesn't match the data for
> InternalRow.
>
>
>
> On Thu, Mar 21, 2019 at 3:28 PM Long, Andrew 
> wrote:
>
> Hello Friends,
>
>
>
> I’m working on a performance improvement that reads additional parquet
> files in the middle of a lambda and I’m running into some issues.  This is
> what id like todo
>
>
>
> ds.mapPartitions(x=>{
>   //read parquet file in and perform an operation with x
> })
>
>
>
>
>
> Here’s my current POC code but I’m getting nonsense back from the row
> reader.
>
>
>
> *import *com.amazon.horizon.azulene.util.SparkFileUtils._
>
> *spark*.*conf*.set("spark.sql.parquet.enableVectorizedReader","false")
>
> *val *data = *List*(
>   *TestRow*(1,1,"asdf"),
>   *TestRow*(2,1,"asdf"),
>   *TestRow*(3,1,"asdf"),
>   *TestRow*(4,1,"asdf")
> )
>
> *val *df = *spark*.createDataFrame(data)
>
> *val *folder = Files.*createTempDirectory*("azulene-test")
>
> *val *folderPath = folder.toAbsolutePath.toString + "/"
> df.write.mode("overwrite").parquet(folderPath)
>
> *val *files = *spark*.fs.listStatus(folder.toUri)
>
> *val *file = files(1)//skip _success file
>
> *val *partitionSchema = *StructType*(*Seq*.empty)
> *val *dataSchema = df.schema
> *val *fileFormat = *new *ParquetFileFormat()
>
> *val *path = file.getPath
>
> *val *status = *spark*.fs.getFileStatus(path)
>
> *val *pFile = *new *PartitionedFile(
>   partitionValues = InternalRow.*empty*,//This should be empty for non
> partitioned values
>   filePath = path.toString,
>   start = 0,
>   length = status.getLen
> )
>
> *val *readFile: (PartitionedFile) => Iterator[Any] =
> //Iterator[InternalRow]
>   fileFormat.buildReaderWithPartitionValues(
> sparkSession = *spark*,
> dataSchema = dataSchema,
> partitionSchema = partitionSchema,//this should be empty for non
> partitioned feilds
> requiredSchema = dataSchema,
> filters = *Seq*.empty,
> options = *Map*.*empty*,
> hadoopConf = *spark*.sparkContext.hadoopConfiguration
> //relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
>   )
>
> *import *scala.collection.JavaConverters._
>
> *val *rows = readFile(pFile).flatMap(_ *match *{
>   *case *r: InternalRow => *Seq*(r)
>
>   // This doesn't work. vector mode is doing something screwy
>   *case *b: ColumnarBatch => b.rowIterator().asScala
> }).toList
>
> *println*(rows)
> //List([0,1,5b,24,66647361])
> //??this is wrong I think
>
>
>
> Has anyone attempted something similar?
>
>
>
> Cheers Andrew
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>


Re: spark sql occer error

2019-03-22 Thread Wenchen Fan
Did you include the whole error message?

On Fri, Mar 22, 2019 at 12:45 AM 563280193 <563280...@qq.com> wrote:

> Hi ,
> I ran a spark sql like this:
>
> *select imei,tag, product_id,*
> *   sum(case when succ1>=1 then 1 else 0 end) as succ,*
> *   sum(case when fail1>=1 and succ1=0 then 1 else 0 end) as fail,*
> *   count(*) as cnt*
> *from t_tbl*
> *where sum(case when succ1>=1 then 1 else 0 end)=0 and sum(case when
> fail1>=1 and succ1=0 then 1 else 0 end)>0*
> *group by **tag, product_id, app_version*
>
> It occur a problem below:
>
> * execute, tree:*
> *Exchange hashpartitioning(imei#0, tag#1, product_id#2, 100)*
> *+- *(1) HashAggregate(keys=[imei#0, tag#1, product_id#2],
> functions=[partial_sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END as
> bigint)), partial_sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0))
> THEN 1 ELSE 0 END as bigint)), partial_count(1)], output=[imei#0, tag#1,
> product_id#2, sum#49L, sum#50L, count#51L])*
> *   +- *(1) Filter ((sum(cast(CASE WHEN (succ1#3L >= 1) THEN 1 ELSE 0 END
> as bigint)) = 0) && (sum(cast(CASE WHEN ((fail1#4L >= 1) && (succ1#3L = 0))
> THEN 1 ELSE 0 END as bigint)) > 0))*
> *  +- *(1) FileScan json [imei#0,tag#1,product_id#2,succ1#3L,fail1#4L]
> Batched: false, Format: JSON, Location: InMemoryFileIndex[hdfs://xx],
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct*
>
>
> Could anyone help me to solve this problem?
> my spark version is 2.3.1
> thank you.
>


Re: [ANNOUNCE] Announcing Apache Spark 2.3.3

2019-02-18 Thread Wenchen Fan
great job!

On Mon, Feb 18, 2019 at 4:24 PM Hyukjin Kwon  wrote:

> Yay! Good job Takeshi!
>
> On Mon, 18 Feb 2019, 14:47 Takeshi Yamamuro 
>> We are happy to announce the availability of Spark 2.3.3!
>>
>> Apache Spark 2.3.3 is a maintenance release, based on the branch-2.3
>> maintenance branch of Spark. We strongly recommend all 2.3.x users to
>> upgrade to this stable release.
>>
>> To download Spark 2.3.3, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-2-3-3.html
>>
>> We would like to acknowledge all community members for contributing to
>> this release. This release would not have been possible without you.
>>
>> Best,
>> Takeshi
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>


Re: [ANNOUNCE] Announcing Apache Spark 2.4.0

2018-11-08 Thread Wenchen Fan
+ user list

On Fri, Nov 9, 2018 at 2:20 AM Wenchen Fan  wrote:

> resend
>
> On Thu, Nov 8, 2018 at 11:02 PM Wenchen Fan  wrote:
>
>>
>>
>> -- Forwarded message -
>> From: Wenchen Fan 
>> Date: Thu, Nov 8, 2018 at 10:55 PM
>> Subject: [ANNOUNCE] Announcing Apache Spark 2.4.0
>> To: Spark dev list 
>>
>>
>> Hi all,
>>
>> Apache Spark 2.4.0 is the fifth release in the 2.x line. This release
>> adds Barrier Execution Mode for better integration with deep learning
>> frameworks, introduces 30+ built-in and higher-order functions to deal with
>> complex data type easier, improves the K8s integration, along with
>> experimental Scala 2.12 support. Other major updates include the built-in
>> Avro data source, Image data source, flexible streaming sinks, elimination
>> of the 2GB block size limitation during transfer, Pandas UDF improvements.
>> In addition, this release continues to focus on usability, stability, and
>> polish while resolving around 1100 tickets.
>>
>> We'd like to thank our contributors and users for their contributions and
>> early feedback to this release. This release would not have been possible
>> without you.
>>
>> To download Spark 2.4.0, head over to the download page:
>> http://spark.apache.org/downloads.html
>>
>> To view the release notes:
>> https://spark.apache.org/releases/spark-release-2-4-0.html
>>
>> Thanks,
>> Wenchen
>>
>> PS: If you see any issues with the release notes, webpage or published
>> artifacts, please contact me directly off-list.
>>
>


Re: BroadcastJoin failed on partitioned parquet table

2018-10-01 Thread Wenchen Fan
I'm not sure if Spark 1.6 is still maintained, can you try a 2.x spark
version and see if the problem still exists?

On Sun, Sep 30, 2018 at 4:14 PM 白也诗无敌 <445484...@qq.com> wrote:

> Besides I have tried ANALYZE statement. It has no use cause I need the
> single partition but get the total table size by hive parameter 'totalSize'
> or 'rawSize' and so on
>
>
>
>
> Hi, guys:
>  I'm using Spark1.6.2.
>  There are two tables and the small one is a partitioned parquet table;
>  The total size of the small table is 1000M but each partition only 1M;
>  When I set spark.sql.autoBroadcastJoinThreshold to 50m ​and join the
> two tables with single partition, I get the SortMergeJoin physical plan.
>  I have made some try and it has something to do with the partition
> pruning:
>  1. check the physical plan, and all of the partitions of the small
> table are added in.
>   It seems like https://issues.apache.org/jira/browse/SPARK-16980
>
>  2. set spark.sql.hive.convertMetastoreParquet=false
> ​The pruning is success, but still get SortMergeJoin because the code
> HiveMetastoreCatalog.scala
>   @transient override lazy val statistics: Statistics = Statistics(
>
>   sizeInBytes = {
> val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
> val rawDataSize = 
> hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
>
> The total size of the table not the single partition.
>
> How can I fix this without patches? Or Is there a patch for SPARK1.6
> about SPARK-16980.
>
>
> best regards!
> Jerry
>


[SPARK-24771] Upgrade AVRO version from 1.7.7 to 1.8

2018-08-14 Thread Wenchen Fan
Hi all,

We've upgraded Avro from 1.7 to 1.8, to support date/timestamp/decimal
types in the newly added Avro data source in the coming Spark 2.4, and also
to make Avro work with Parquet.

Since Avro 1.8 is not binary compatible with Avro 1.7 (see
https://issues.apache.org/jira/browse/AVRO-1502), users may need to
re-compile their Spark applications if they use Spark with Avro.

I'm sending this email to collect feedbacks for this upgrade, so that we
can write the release notes more clearly about what can be broken.

Thanks,
Wenchen


Re: AccumulatorV2 vs AccumulableParam (V1)

2018-05-03 Thread Wenchen Fan
Hi Sergey,

Thanks for your valuable feedback!

For 1: yea this is definitely a bug and I have sent a PR to fix it.
For 2: I have left my comments on the JIRA ticket.
For 3: I don't quite understand it, can you give some concrete examples?
For 4: yea this is a problem, but I think it's not a big deal, and we
couldn't find a better solution at that time.
For 5: I think this is a real problem. It looks to me that we can merge
`isZero`, `copyAndReset`, `copy`, `reset` into one API: `zero`, which is
basically just the `copyAndReset`. If there is a way to fix this without
breaking the existing API, I'm really happy to do it.
For 6: same as 4. It's a problem but not a big deal.

In general, I think accumulator v2 sacrifices some flexibility to simplify
the framework and improve the performance. Users can still use accumulator
v1 if flexibility is more important to them. We can keep improving
accumulator v2 without breaking backward compatibility.

Thanks,
Wenchen

On Thu, May 3, 2018 at 6:20 AM, Sergey Zhemzhitsky 
wrote:

> Hello guys,
>
> I've started to migrate my Spark jobs which use Accumulators V1 to
> AccumulatorV2 and faced with the following issues:
>
> 1. LegacyAccumulatorWrapper now requires the resulting type of
> AccumulableParam to implement equals. In other case the
> AccumulableParam, automatically wrapped into LegacyAccumulatorWrapper,
> will fail with AssertionError (SPARK-23697 [1]).
>
> 2. Existing AccumulatorV2 classes are hardly difficult to extend
> easily and correctly (SPARK-24154 [2]) due to its "copy" method which
> is called during serialization and usually loses type information of
> descendant classes which don't override "copy" (and it's easier to
> implement an accumulator from scratch than override it correctly)
>
> 3. The same instance of AccumulatorV2 cannot be used with the same
> SparkContext multiple times (unlike AccumulableParam) failing with
> "IllegalStateException: Cannot register an Accumulator twice" even
> after "reset" method called. So it's impossible to unregister already
> registered accumulator from user code.
>
> 4. AccumulableParam (V1) implementations are usually more or less
> stateless, while AccumulatorV2 implementations are almost always
> stateful, leading to (unnecessary?) type checks (unlike
> AccumulableParam). For example typical "merge" method of AccumulatorV2
> requires to check whether current accumulator is of an appropriate
> type, like here [3]
>
> 5. AccumulatorV2 is more difficult to implement correctly unlike
> AccumulableParam. For example, in case of AccumulableParam I have to
> implement just 3 methods (addAccumulator, addInPlace, zero), in case
> of AccumulableParam - just 2 methods (addInPlace, zero) and in case of
> AccumulatorV2 - 6 methods (isZero, copy, reset, add, merge, value)
>
> 6. AccumulatorV2 classes are hardly possible to be anonymous classes,
> because of their "copy" and "merge" methods which typically require a
> concrete class to make a type check.
>
> I understand the motivation for AccumulatorV2 (SPARK-14654 [4]), but
> just wondering whether there is a way to simplify the API of
> AccumulatorV2 to meet the points described above and to be less error
> prone?
>
>
> [1] https://issues.apache.org/jira/browse/SPARK-23697
> [2] https://issues.apache.org/jira/browse/SPARK-24154
> [3] https://github.com/apache/spark/blob/4f5bad615b47d743b8932aea107165
> 2293981604/core/src/main/scala/org/apache/spark/util/
> AccumulatorV2.scala#L348
> [4] https://issues.apache.org/jira/browse/SPARK-14654
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Broken SQL Visualization?

2018-01-15 Thread Wenchen Fan
Hi, thanks for reporting, can you include the steps to reproduce this bug?

On Tue, Jan 16, 2018 at 7:07 AM, Ted Yu  wrote:

> Did you include any picture ?
>
> Looks like the picture didn't go thru.
>
> Please use third party site.
>
> Thanks
>
>  Original message 
> From: Tomasz Gawęda 
> Date: 1/15/18 2:07 PM (GMT-08:00)
> To: d...@spark.apache.org, user@spark.apache.org
> Subject: Broken SQL Visualization?
>
> Hi,
>
> today I have updated my test cluster to current Spark master, after that
> my SQL Visualization page started to crash with following error in JS:
>
> Screenshot was cut for readability and to hide internal server names ;)
>
> It may be caused by upgrade or by some code changes, but - to be honest -
> I did not use any new operators nor any new Spark function, so it should
> render correctly, like few days ago. Some Visualizations work fine, some
> crashes, I don't have any doubts why it may not work. Can anyone help me?
> Probably it is a bug in Spark, but it's hard to me to say in which place.
>
> Thanks in advance!
>
> Pozdrawiam / Best regards,
>
> Tomek
>


Re: How to persistent database/table created in sparkSession

2017-12-05 Thread Wenchen Fan
Try with `SparkSession.builder().enableHiveSupport` ?

On Tue, Dec 5, 2017 at 3:22 PM, 163  wrote:

> Hi,
> How can I persistent database/table created in spark application?
>
> object TestPersistentDB {
> def main(args:Array[String]): Unit = {
> val spark = SparkSession.builder()
> .appName("Create persistent table")
> .config("spark.master”,"local")
> .getOrCreate()
> import spark.implicits._
> spark.sql("create database testdb location \"
> hdfs://node1:8020/testdb\")
>
> }
> }
>
>   When I use spark.sql(“create database”) in sparkSession, and close this
> sparkSession.
>   The created database is not persisted to metadata, So I cannot find it
> in spark-sql: show databases.
>
>
>
> regards
> wendy
>


Re: Dataset API Question

2017-10-25 Thread Wenchen Fan
It's because of different API design.

*RDD.checkpoint* returns void, which means it mutates the RDD state so you
need a *RDD**.isCheckpointed* method to check if this RDD is checkpointed.

*Dataset.checkpoint* returns a new Dataset, which means there is no
isCheckpointed state in Dataset, and thus we don't need a
*Dataset.isCheckpointed* method.


On Wed, Oct 25, 2017 at 6:39 PM, Bernard Jesop 
wrote:

> Actually, I realized keeping the info would not be enough as I need to
> find back the checkpoint files to delete them :/
>
> 2017-10-25 19:07 GMT+02:00 Bernard Jesop :
>
>> As far as I understand, Dataset.rdd is not the same as InternalRDD.
>> It is just another RDD representation of the same Dataset and is created
>> on demand (lazy val) when Dataset.rdd is called.
>> This totally explains the observed behavior.
>>
>> But how would would it be possible to know that a Dataset have been
>> checkpointed?
>> Should I manually keep track of that info?
>>
>> 2017-10-25 15:51 GMT+02:00 Bernard Jesop :
>>
>>> Hello everyone,
>>>
>>> I have a question about checkpointing on dataset.
>>>
>>> It seems in 2.1.0 that there is a Dataset.checkpoint(), however unlike
>>> RDD there is no Dataset.isCheckpointed().
>>>
>>> I wonder if Dataset.checkpoint is a syntactic sugar for
>>> Dataset.rdd.checkpoint.
>>> When I do :
>>>
>>> Dataset.checkpoint; Dataset.count
>>> Dataset.rdd.isCheckpointed // result: false
>>>
>>> However, when I explicitly do:
>>> Dataset.rdd.checkpoint; Dataset.rdd.count
>>> Dataset.rdd.isCheckpointed // result: true
>>>
>>> Could someone explain this behavior to me, or provide some references?
>>>
>>> Best regards,
>>> Bernard
>>>
>>
>>
>


Re: appendix

2017-06-20 Thread Wenchen Fan
you should make hbase a data source(seems we already have hbase connector?), 
create a dataframe from hbase,  and do join in Spark SQL.

> On 21 Jun 2017, at 10:17 AM, sunerhan1...@sina.com wrote:
> 
> Hello,
> My scenary is like this:
> 1.val df=hivecontext/carboncontex.sql("sql")
> 2.iterating rows,extrating two columns,id and mvcc, and use id as key 
> to scan hbase to get corresponding value
> if mvcc==value, this row pass,else drop
> Is there a better way except dataframe.mapPartitions because it cause an 
> extra stage and spend more time.
> I put two DAGs in appendix,please check!
> 
> Thanks!!
> sunerhan1...@sina.com 
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org 
>