Lineage between Datasets

2017-04-12 Thread Chang Chen
Hi All I believe that there is no lineage between datasets. Consider this case: val people = spark.read.parquet("...").as[Person] val ageGreatThan30 = people.filter("age > 30") Since the second DS can push down the condition, they are obviously different logical plans and hence are different

Re: Lineage between Datasets

2017-04-12 Thread Chang Chen
age". You can get that by > calling explain(true) and look at the analyzed plan. > > > On Wed, Apr 12, 2017 at 3:03 AM Chang Chen <baibaic...@gmail.com> wrote: > >> Hi All >> >> I believe that there is no lineage between datasets. Consider this case: >

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-13 Thread Chang Chen
Hi Wenchen Yes. We also find this error is caused by Rand. However, this is classic way to solve data skew in Hive. Is there any equivalent way in Spark? Thanks Chang On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan wrote: > It’s not about case when, but about rand().

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
hey do not care > > the semantics difference. > > > > Thanks, > > > > Xiao > > > > > > > > 2017-07-16 20:07 GMT-07:00 Chang Chen > > > baibaichen@ > > > : > > > >> It is tedious since we have lots of Hive SQL being

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
om/apache/ > spark/pull/15417#discussion_r85295977 > > 2017-07-17 15:44 GMT+08:00 Chang Chen <baibaic...@gmail.com>: > >> Hi All >> >> I don't understand the difference between the semantics, I found Spark >> does the same thing for GroupBy non-deterministic. Fro

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
different evaluation orders change > results. > > > > Chang Chen wrote > > I see the issue. I will try https://github.com/apache/spark/pull/18652, > I > > think > > > > 1 For Join Operator, the left and right plan can't be non-deterministic. > > 2 If F

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
does matter. A non-deterministic expression can change its > output due to internal state which may depend on input order. > > MonotonicallyIncreasingID is an example for the stateful expression. Once > you change the row order, the evaluation results are different. > >

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-16 Thread Chang Chen
LSE >col2 > END AS col2 > FROM tbl1) a > LEFT OUTER JOIN tbl2 b > ON a.col2 = b.col3; > > > > Chang Chen wrote > > Hi Wenchen > > > > Yes. We also find this error is caused by Rand. However, this is classic > > way to solve data skew in

Question on HashJoin trait

2017-07-26 Thread Chang Chen
Hi I am reading Spark SQL codes, what do streamedPlan and buildPlan of HashJoin trait for? protected lazy val (buildPlan, streamedPlan) = buildSide match { case BuildLeft => (left, right) case BuildRight => (right, left) }

Is static volatile variable different with static variable in the closure?

2017-06-07 Thread Chang Chen
Static variable will be initialized in worker node JVM, will not be serialized from master. But how about static volatile variable? Recently I read the beam spark runner code, and i find that they use static volatile Broadcast variable. See

How to print plan of Structured Streaming DataFrame

2017-11-20 Thread Chang Chen
Hi Guys I modified StructuredNetworkWordCount to see what the executed plan is, here are my codes: val wordCounts = words.groupBy("value").count() // Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete")

Re: How to print plan of Structured Streaming DataFrame

2017-11-21 Thread Chang Chen
vii...@gmail.com> wrote: > > wordCounts.explain() -> query.explain()? > > > Chang Chen wrote > > Hi Guys > > > > I modified StructuredNetworkWordCount to see what the executed plan is, > > here are my codes: > > > > val wordCounts = words.groupBy(&qu

Is RDD thread safe?

2019-11-11 Thread Chang Chen
Hi all I meet a case where I need cache a source RDD, and then create different DataFrame from it in different threads to accelerate query. I know that SparkSession is thread safe( https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure whether RDD is thread safe or not Thanks

Re: Is RDD thread safe?

2019-11-24 Thread Chang Chen
Nov 25, 2019 at 11:31 AM Chang Chen wrote: > >> I am wonder the concurrent semantics for reason about the correctness. If >> the two query simultaneously run the DAGs which use the same cached >> DF\RDD,but before cache data actually happen, what will happen? >> &

Re: Is RDD thread safe?

2019-11-24 Thread Chang Chen
12, 2019 at 12:31 PM Chang Chen wrote: > >> Hi all >> >> I meet a case where I need cache a source RDD, and then create different >> DataFrame from it in different threads to accelerate query. >> >> I know that SparkSession is thread safe( >> htt

Re: Is RDD thread safe?

2019-11-25 Thread Chang Chen
at 2:29 AM Weichen Xu > wrote: > >> emmm, I haven't check code, but I think if an RDD is referenced in >> several places, the correct behavior should be: when this RDD data is >> needed, it will be computed and then cached only once, otherwise it should >> be treated as a

[DISCUSS] Caching SparkPlan

2020-01-31 Thread Chang Chen
I'd like to start a discussion on caching SparkPlan >From what I benchmark, if sql execution time is less than 1 second, then we cannot ignore the following overheads , especially if we cache data in memory 1. Paring, analysing, optimizing SQL 2. Generating Physical Plan (SparkPlan) 3.

Re: Performance of VectorizedRleValuesReader

2020-09-14 Thread Chang Chen
happen to have any opinion there? that particular section >> was introduced in the Parquet 1.10 update: >> >> https://github.com/apache/spark/commit/cac9b1dea1bb44fa42abf77829c05bf93f70cf20 >> It looks like it didn't use to make a ByteBuffer each time, but read from >> in.

Re: Performance of VectorizedRleValuesReader

2020-09-13 Thread Chang Chen
dex, this.currentBuffer, valueIndex); valueIndex += 8; } Sean Owen 于2020年9月14日周一 上午10:40写道: > It certainly can't be called once - it's reading different data each time. > There might be a faster way to do it, I don't know. Do you have ideas? > > On Sun, Sep 13, 2020 at 9:25 PM Chang Chen wr

Performance of VectorizedRleValuesReader

2020-09-13 Thread Chang Chen
Hi export it looks like there is a hot spot in VectorizedRleValuesReader#readNextGroup () case PACKED: int numGroups = header >>> 1; this.currentCount = numGroups * 8; if (this.currentBuffer.length < this.currentCount) { this.currentBuffer = new int[this.currentCount]; }

Re: [DISCUSS][SPIP] Standardize Spark Exception Messages

2020-10-27 Thread Chang Chen
hi Xinyi Just curious, which tool did you use to generate this Xinyi Yu 于2020年10月26日周一 上午8:05写道: > Hi all, > > We like to post a SPIP of Standardize Exception Messages in Spark. Here is >

[DISCUSS][SQL] Improve Performance of AggregationIterator

2020-07-28 Thread Chang Chen
Hi Spark Developers We are implementing a new TypedImperativeAggregate which will benefit from batch to batch update or merge. And at least, in the Sort based aggregation, we can process inputs batch to batch. Does anyone do the same optimization?

Re: AWS Consistent S3 & Apache Hadoop's S3A connector

2020-12-06 Thread Chang Chen
Since S3A now works perfectly with S3Guard turned off, Could Magic Committor work with S3Guard is off? If Yes, will performance degenerate? Or if HADOOP-17400 is fixed, then it will have comparable performance? Steve Loughran 于2020年12月4日周五 下午10:00写道: > as sent to hadoop-general. > > TL;DR. S3

The progress of DataSourceV2 based connector for JDBC?

2020-12-25 Thread Chang Chen
Hi All Is there any plan for supporting JDBC DataSourceV2? I noticed this PR(https://github.com/apache/spark/pull/25211) but was closed a year ago. @Wenchen Fan already implemented some basic catalog functionality, so we can using datasource v2 by SQL, for example: select * from h2.db.table.

Support ZOrder in OSS

2021-01-09 Thread Chang Chen
Hi All I found that impala already implemented Zorder ( https://issues.apache.org/jira/browse/IMPALA-8755). I used to think supporting zorder needed file format support, but from the impala implementation, it looks like only needing to implement a new RecordComparator which is independent with

[Discuss][SPIP] DataSource V2 SQL push down

2021-04-02 Thread Chang Chen
Hi All We would like to post s SPIP of Datasource V2 SQL PushDown in Spark. Here is document link: https://olapio.atlassian.net/wiki/spaces/TeamCX/pages/2667315361/Discuss+SQL+Data+Source+V2+SQL+Push+Down?atlOrigin=eyJpIjoiOTI5NGYzYWMzMWYwNDliOWIwM2ZkODllODk4Njk2NzEiLCJwIjoiYyJ9 This SPIP aims

Re: [Discuss][SPIP] DataSource V2 SQL push down

2021-04-05 Thread Chang Chen
are >defeating the purpose of having DS V2. So I want to wait until we fully >migrate to DS V2 JDBC, and then implement Aggregate push down for JDBC. > > > I have submitted Parquet Aggregate push down PR. Here is the link: > > https://github.com/apache/spark/pull/32049 &g

Re: [Discuss][SPIP] DataSource V2 SQL push down

2021-04-07 Thread Chang Chen
hi huaxin please review https://github.com/apache/spark/pull/32061 as for add a *trait PrunedFilteredAggregateScan* for V1 JDBC, I delete trait, since V1 DataSource needn't support aggregation push down Chang Chen 于2021年4月5日周一 下午10:02写道: > Hi huaxin > > What I am concer

Re: SessionCatalog lock issue

2021-03-18 Thread Chang Chen
o only wrap > `formatDatabaseName(name.database.getOrElse(currentDb))` with > `synchronized`. > > On Thu, Mar 18, 2021 at 3:38 PM Chang Chen wrote: > >> hi all >> >> We met an issue which is related with SessionCatalog synchronized, for >> example >>

Re: SessionCatalog lock issue

2021-03-19 Thread Chang Chen
abaseName(name.database.getOrElse(currentDb))` with > `synchronized`. > > On Thu, Mar 18, 2021 at 3:38 PM Chang Chen wrote: > >> hi all >> >> We met an issue which is related with SessionCatalog synchronized, for >> example >> >> def tableExists(nam

SessionCatalog lock issue

2021-03-18 Thread Chang Chen
hi all We met an issue which is related with SessionCatalog synchronized, for example def tableExists(name: TableIdentifier): Boolean = synchronized { val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) externalCatalog.tableExists(db,

Re: Apache Spark 3.2 Expectation

2021-03-03 Thread Chang Chen
+1 for Data Source V2 Aggregate push down huaxin gao 于2021年2月27日周六 上午4:20写道: > Thanks Dongjoon and Xiao for the discussion. I would like to add Data > Source V2 Aggregate push down to the list. I am currently working on > JDBC Data Source V2 Aggregate push down, but the common code can be used

Re: [Discuss][SPIP] DataSource V2 SQL push down

2021-04-09 Thread Chang Chen
meeting to discuss this? > > Thanks, > > Huaxin > > > > On Wed, Apr 7, 2021 at 1:32 AM Chang Chen wrote: > >> hi huaxin >> >> please review https://github.com/apache/spark/pull/32061 >> >> as for add a *trait PrunedFilteredAggregateScan* for

Re: A scene with unstable Spark performance

2022-05-18 Thread Chang Chen
This is a case where resources are fixed in the same SparkContext, but sqls have different priorities. Some SQLs are only allowed to be executed if there are spare resources, once the high priority sql comes in, those sqls taskset either are killed or stalled. If we set a high priority pool's