Re: A scene with unstable Spark performance

2022-05-18 Thread Chang Chen
This is a case where resources are fixed in the same SparkContext, but sqls
have different priorities.

Some SQLs are only allowed to be executed if there are spare resources,
once the high priority sql comes in, those sqls taskset either are killed
or stalled.

If  we set a high priority pool's minShare to a relatively higher value,
e.g.  50% or 60% of total cores, does it make sense?


Sungwoo Park  于2022年5月18日周三 13:28写道:

> The problem you describe is the motivation for developing Spark on MR3.
> From the blog article (
> https://www.datamonad.com/post/2021-08-18-spark-mr3/):
>
> *The main motivation for developing Spark on MR3 is to allow multiple
> Spark applications to share compute resources such as Yarn containers or
> Kubernetes Pods.*
>
> The problem is due to an architectural limitation of Spark, and I guess
> fixing the problem would require a heavy rewrite of Spark core. When we
> developed Spark on MR3, we were not aware of any attempt being made
> elsewhere (in academia and industry) to address this limitation.
>
> A potential workaround might be to implement a custom Spark application
> that manages the submission of two groups of Spark jobs and controls their
> execution (similarly to Spark Thrift Server). Not sure if this approach
> would fix your problem, though.
>
> If you are interested, see the webpage of Spark on MR3:
> https://mr3docs.datamonad.com/docs/spark/
>
> We have released Spark 3.0.1 on MR3, and Spark 3.2.1 on MR3 is under
> development. For Spark 3.0.1 on MR3, no change is made to Spark and MR3 is
> used as an add-on. The main application of MR3 is Hive on MR3, but Spark on
> MR3 is equally ready for production.
>
> Thank you,
>
> --- Sungwoo
>
>>


Re: [Discuss][SPIP] DataSource V2 SQL push down

2021-04-09 Thread Chang Chen
hi huaxin

I look into your PR, there would be a way to consolidate the file source
and SQL source.

What's the time difference between Beijing and your timezone? I prefer next
Monday night or Tuesday morning.

I can share zoom.

huaxin gao  于2021年4月8日周四 上午7:10写道:

> Hi Chang,
>
> Thanks for working on this.
>
> Could you please explain how your proposal can be extended to the
> file-based data sources? Since at least half of the Spark community are
> using file-based data sources, I think any designs should consider the
> file-based data sources as well. I work on both sql-based and file-based
> data sources, and I understand that they are very different. It’s
> challenging to have a design to work for both, but the current filter push
> down and column pruning have been designed nicely to fit both sides. I
> think we should follow the same approach to make Aggregate push down work
> for both too.
>
> I am currently collaborating with the Apple Spark team and Facebook Spark
> team to push down Aggregate to file-based data sources. We are doing some
> ongoing work right now to push down Max/Min/Count to parquet and later to
> ORC to utilize the statistics information there (
> https://github.com/apache/spark/pull/32049). Please correct me if I am
> wrong: it seems to me that your proposal doesn't consider file-based data
> sources at all and will stop us from continuing our work.
>
> Let's schedule a meeting to discuss this?
>
> Thanks,
>
> Huaxin
>
>
>
> On Wed, Apr 7, 2021 at 1:32 AM Chang Chen  wrote:
>
>> hi huaxin
>>
>> please review https://github.com/apache/spark/pull/32061
>>
>> as for add a *trait PrunedFilteredAggregateScan* for V1 JDBC, I delete
>> trait, since V1 DataSource needn't support aggregation push down
>>
>> Chang Chen  于2021年4月5日周一 下午10:02写道:
>>
>>> Hi huaxin
>>>
>>> What I am concerned about is abstraction
>>>
>>>1. How to extend sources.Aggregation. Because Catalyst Expression
>>>is recursion, it is very bad to define a new hierarchy, I think 
>>> ScanBuilder
>>>must convert pushed expressions to its format.
>>>2. The optimization rule is also an extended point, I didn't see any
>>>consideration on join push down. I also think
>>>SupportsPushDownRequiredColumns and SupportsPushDownFilters are
>>>problematic.
>>>
>>> Obviously, File Based Source and SQL Based Source are quite different on
>>> push down capabilities. I am not sure they can be consolidated into one API.
>>>
>>> I will push my PR tomorrow, and after that, could we schedule a meeting
>>> to discuss the API?
>>>
>>> huaxin gao  于2021年4月5日周一 上午2:24写道:
>>>
>>>> Hello Chang,
>>>>
>>>> Thanks for proposing the SPIP and initiating the discussion. However, I
>>>> think the problem with your proposal is that you haven’t taken into
>>>> consideration file-based data sources such as parquet, ORC, etc. As far as
>>>> I know, most of the Spark users have file-based data sources.  As a matter
>>>> of fact, I have customers waiting for Aggregate push down for Parquet.
>>>> That’s the reason I have my current implementation, which has a unified
>>>> Aggregate push down approach for both the file-based data sources and JDBC.
>>>>
>>>> I discussed with several members of the Spark community recently, and
>>>> we have agreed to break down the Aggregate push down work into the
>>>> following steps:
>>>>
>>>>1.
>>>>
>>>>Implement Max, Min and Count push down in Parquet
>>>>2.
>>>>
>>>>Add a new physical plan rewrite rule to remove partial aggregate.
>>>>We can optimize one more step to remove ShuffleExchange if the group by
>>>>column and partition col are the same.
>>>>3.
>>>>
>>>>Implement Max, Min and Count push down in JDBC
>>>>4.
>>>>
>>>>Implement Sum and Avg push down in JDBC
>>>>
>>>>
>>>> I plan to implement Aggregate push down for Parquet first for now. The
>>>> reasons are:
>>>>
>>>>1.
>>>>
>>>>It’s relatively easier to implement Parquet Aggregate push down
>>>>than JDBC.
>>>>
>>>>
>>>>1.
>>>>
>>>>Only need to implement  Max, Min and Count
>>>>2.
>>>>
>>>>No need to deal w

Re: [Discuss][SPIP] DataSource V2 SQL push down

2021-04-07 Thread Chang Chen
hi huaxin

please review https://github.com/apache/spark/pull/32061

as for add a *trait PrunedFilteredAggregateScan* for V1 JDBC, I delete
trait, since V1 DataSource needn't support aggregation push down

Chang Chen  于2021年4月5日周一 下午10:02写道:

> Hi huaxin
>
> What I am concerned about is abstraction
>
>1. How to extend sources.Aggregation. Because Catalyst Expression
>is recursion, it is very bad to define a new hierarchy, I think ScanBuilder
>must convert pushed expressions to its format.
>2. The optimization rule is also an extended point, I didn't see any
>consideration on join push down. I also think
>SupportsPushDownRequiredColumns and SupportsPushDownFilters are
>problematic.
>
> Obviously, File Based Source and SQL Based Source are quite different on
> push down capabilities. I am not sure they can be consolidated into one API.
>
> I will push my PR tomorrow, and after that, could we schedule a meeting to
> discuss the API?
>
> huaxin gao  于2021年4月5日周一 上午2:24写道:
>
>> Hello Chang,
>>
>> Thanks for proposing the SPIP and initiating the discussion. However, I
>> think the problem with your proposal is that you haven’t taken into
>> consideration file-based data sources such as parquet, ORC, etc. As far as
>> I know, most of the Spark users have file-based data sources.  As a matter
>> of fact, I have customers waiting for Aggregate push down for Parquet.
>> That’s the reason I have my current implementation, which has a unified
>> Aggregate push down approach for both the file-based data sources and JDBC.
>>
>> I discussed with several members of the Spark community recently, and we
>> have agreed to break down the Aggregate push down work into the following
>> steps:
>>
>>1.
>>
>>Implement Max, Min and Count push down in Parquet
>>2.
>>
>>Add a new physical plan rewrite rule to remove partial aggregate. We
>>can optimize one more step to remove ShuffleExchange if the group by 
>> column
>>and partition col are the same.
>>3.
>>
>>Implement Max, Min and Count push down in JDBC
>>4.
>>
>>Implement Sum and Avg push down in JDBC
>>
>>
>> I plan to implement Aggregate push down for Parquet first for now. The
>> reasons are:
>>
>>1.
>>
>>It’s relatively easier to implement Parquet Aggregate push down than
>>JDBC.
>>
>>
>>1.
>>
>>Only need to implement  Max, Min and Count
>>2.
>>
>>No need to deal with the differences between Spark and other databases.
>>For example, aggregating decimal values have different behaviours
>>between database implementations.
>>
>> The main point is that we want to keep the PR minimal and support the
>> basic infrastructure for Aggregate push down first. Actually, the PR for
>> implementing Parquet Aggregate push down is already very big. We don’t want
>> to have a huge PR to solve all the problems. It’s too hard to review.
>>
>>
>>1.
>>
>>I think it’s too early to implement the JDBC Aggregate push down for
>>now. Underneath, V2 DS JDBC still calls the V1 DS JDBC path. If we
>>implement JDBC Aggregate push down now, we still need to add a *trait
>>PrunedFilteredAggregateScan* for V1 JDBC. One of the major
>>motivations that we are having V2 DS is that we want to improve the
>>flexibility of implementing new operator push down by avoiding adding a 
>> new
>>    push down trait. If we still add a new pushdown trait in V1 DS JDBC, I 
>> feel
>>we are defeating the purpose of having DS V2. So I want to wait until we
>>fully migrate to DS V2 JDBC, and then implement Aggregate push down for
>>JDBC.
>>
>>
>> I have submitted Parquet Aggregate push down PR. Here is the link:
>>
>> https://github.com/apache/spark/pull/32049
>>
>>
>> Thanks,
>>
>> Huaxin
>>
>>
>> On Fri, Apr 2, 2021 at 1:04 AM Chang Chen  wrote:
>>
>>> The link is broken. I post a PDF version.
>>>
>>> Chang Chen  于2021年4月2日周五 下午3:57写道:
>>>
>>>> Hi All
>>>>
>>>> We would like to post s SPIP of Datasource V2 SQL PushDown in Spark.
>>>> Here is document link:
>>>>
>>>>
>>>> https://olapio.atlassian.net/wiki/spaces/TeamCX/pages/2667315361/Discuss+SQL+Data+Source+V2+SQL+Push+Down?atlOrigin=eyJpIjoiOTI5NGYzYWMzMWYwNDliOWIwM2ZkODllODk4Njk2NzEiLCJwIjoiYyJ9
>>>>
>>>> This SPIP aims to make pushdown more extendable.
>>>>
>>>> I would like to thank huaxin gao, my prototype is based on her PR. I
>>>> will submit a PR ASAP
>>>>
>>>> Thanks
>>>>
>>>> Chang.
>>>>
>>>


Re: [Discuss][SPIP] DataSource V2 SQL push down

2021-04-05 Thread Chang Chen
Hi huaxin

What I am concerned about is abstraction

   1. How to extend sources.Aggregation. Because Catalyst Expression
   is recursion, it is very bad to define a new hierarchy, I think ScanBuilder
   must convert pushed expressions to its format.
   2. The optimization rule is also an extended point, I didn't see any
   consideration on join push down. I also think
   SupportsPushDownRequiredColumns and SupportsPushDownFilters are
   problematic.

Obviously, File Based Source and SQL Based Source are quite different on
push down capabilities. I am not sure they can be consolidated into one API.

I will push my PR tomorrow, and after that, could we schedule a meeting to
discuss the API?

huaxin gao  于2021年4月5日周一 上午2:24写道:

> Hello Chang,
>
> Thanks for proposing the SPIP and initiating the discussion. However, I
> think the problem with your proposal is that you haven’t taken into
> consideration file-based data sources such as parquet, ORC, etc. As far as
> I know, most of the Spark users have file-based data sources.  As a matter
> of fact, I have customers waiting for Aggregate push down for Parquet.
> That’s the reason I have my current implementation, which has a unified
> Aggregate push down approach for both the file-based data sources and JDBC.
>
> I discussed with several members of the Spark community recently, and we
> have agreed to break down the Aggregate push down work into the following
> steps:
>
>1.
>
>Implement Max, Min and Count push down in Parquet
>2.
>
>Add a new physical plan rewrite rule to remove partial aggregate. We
>can optimize one more step to remove ShuffleExchange if the group by column
>and partition col are the same.
>3.
>
>Implement Max, Min and Count push down in JDBC
>4.
>
>Implement Sum and Avg push down in JDBC
>
>
> I plan to implement Aggregate push down for Parquet first for now. The
> reasons are:
>
>1.
>
>It’s relatively easier to implement Parquet Aggregate push down than
>JDBC.
>
>
>1.
>
>Only need to implement  Max, Min and Count
>2.
>
>No need to deal with the differences between Spark and other databases.
>For example, aggregating decimal values have different behaviours
>between database implementations.
>
> The main point is that we want to keep the PR minimal and support the
> basic infrastructure for Aggregate push down first. Actually, the PR for
> implementing Parquet Aggregate push down is already very big. We don’t want
> to have a huge PR to solve all the problems. It’s too hard to review.
>
>
>1.
>
>I think it’s too early to implement the JDBC Aggregate push down for
>now. Underneath, V2 DS JDBC still calls the V1 DS JDBC path. If we
>implement JDBC Aggregate push down now, we still need to add a *trait
>PrunedFilteredAggregateScan* for V1 JDBC. One of the major motivations
>that we are having V2 DS is that we want to improve the flexibility of
>implementing new operator push down by avoiding adding a new push down
>trait. If we still add a new pushdown trait in V1 DS JDBC, I feel we are
>defeating the purpose of having DS V2. So I want to wait until we fully
>migrate to DS V2 JDBC, and then implement Aggregate push down for JDBC.
>
>
> I have submitted Parquet Aggregate push down PR. Here is the link:
>
> https://github.com/apache/spark/pull/32049
>
>
> Thanks,
>
> Huaxin
>
>
> On Fri, Apr 2, 2021 at 1:04 AM Chang Chen  wrote:
>
>> The link is broken. I post a PDF version.
>>
>> Chang Chen  于2021年4月2日周五 下午3:57写道:
>>
>>> Hi All
>>>
>>> We would like to post s SPIP of Datasource V2 SQL PushDown in Spark.
>>> Here is document link:
>>>
>>>
>>> https://olapio.atlassian.net/wiki/spaces/TeamCX/pages/2667315361/Discuss+SQL+Data+Source+V2+SQL+Push+Down?atlOrigin=eyJpIjoiOTI5NGYzYWMzMWYwNDliOWIwM2ZkODllODk4Njk2NzEiLCJwIjoiYyJ9
>>>
>>> This SPIP aims to make pushdown more extendable.
>>>
>>> I would like to thank huaxin gao, my prototype is based on her PR. I
>>> will submit a PR ASAP
>>>
>>> Thanks
>>>
>>> Chang.
>>>
>>


[Discuss][SPIP] DataSource V2 SQL push down

2021-04-02 Thread Chang Chen
Hi All

We would like to post s SPIP of Datasource V2 SQL PushDown in Spark.  Here
is document link:

https://olapio.atlassian.net/wiki/spaces/TeamCX/pages/2667315361/Discuss+SQL+Data+Source+V2+SQL+Push+Down?atlOrigin=eyJpIjoiOTI5NGYzYWMzMWYwNDliOWIwM2ZkODllODk4Njk2NzEiLCJwIjoiYyJ9

This SPIP aims to make pushdown more extendable.

I would like to thank huaxin gao, my prototype is based on her PR. I will
submit a PR ASAP

Thanks

Chang.


Re: SessionCatalog lock issue

2021-03-19 Thread Chang Chen
Hi Wenchen

Thanks for your suggestion, would you please review this PR(
https://github.com/apache/spark/pull/31891)

Thanks
Chang

Wenchen Fan  于2021年3月18日周四 下午9:47写道:

> The `synchronized` is needed for getting `currentDb` IIUC. So a small
> change is to only wrap
> `formatDatabaseName(name.database.getOrElse(currentDb))` with
> `synchronized`.
>
> On Thu, Mar 18, 2021 at 3:38 PM Chang Chen  wrote:
>
>> hi  all
>>
>> We met an issue which is related with SessionCatalog synchronized, for
>> example
>>
>> def tableExists(name: TableIdentifier): Boolean = synchronized {
>>   val db = formatDatabaseName(name.database.getOrElse(currentDb))
>>   val table = formatTableName(name.table)
>>   externalCatalog.tableExists(db, table)
>> }
>>
>> We have modified the underlying hive meta store which a different hive
>> database is placed in its own shard for performance. However, we found that
>> the synchronized limits the concurrency, we would like
>> 1. replace  synchronized with a read-write lock
>> 2. remove synchronized in hive *withClient* function
>>
>> Let me know what you guys think of it?
>>
>> If it is ok, I will create an issue and contribute a PR
>>
>> Thanks
>> Chang
>>
>


Re: SessionCatalog lock issue

2021-03-18 Thread Chang Chen
Oh, yeah, moving the call to externalCatalog out of `synchronized` would be
better.

But we can not do such a simple change in all cases, anyway, we will try.

Thanks


Wenchen Fan  于2021年3月18日周四 下午9:47写道:

> The `synchronized` is needed for getting `currentDb` IIUC. So a small
> change is to only wrap
> `formatDatabaseName(name.database.getOrElse(currentDb))` with
> `synchronized`.
>
> On Thu, Mar 18, 2021 at 3:38 PM Chang Chen  wrote:
>
>> hi  all
>>
>> We met an issue which is related with SessionCatalog synchronized, for
>> example
>>
>> def tableExists(name: TableIdentifier): Boolean = synchronized {
>>   val db = formatDatabaseName(name.database.getOrElse(currentDb))
>>   val table = formatTableName(name.table)
>>   externalCatalog.tableExists(db, table)
>> }
>>
>> We have modified the underlying hive meta store which a different hive
>> database is placed in its own shard for performance. However, we found that
>> the synchronized limits the concurrency, we would like
>> 1. replace  synchronized with a read-write lock
>> 2. remove synchronized in hive *withClient* function
>>
>> Let me know what you guys think of it?
>>
>> If it is ok, I will create an issue and contribute a PR
>>
>> Thanks
>> Chang
>>
>


SessionCatalog lock issue

2021-03-18 Thread Chang Chen
hi  all

We met an issue which is related with SessionCatalog synchronized, for
example

def tableExists(name: TableIdentifier): Boolean = synchronized {
  val db = formatDatabaseName(name.database.getOrElse(currentDb))
  val table = formatTableName(name.table)
  externalCatalog.tableExists(db, table)
}

We have modified the underlying hive meta store which a different hive
database is placed in its own shard for performance. However, we found that
the synchronized limits the concurrency, we would like
1. replace  synchronized with a read-write lock
2. remove synchronized in hive *withClient* function

Let me know what you guys think of it?

If it is ok, I will create an issue and contribute a PR

Thanks
Chang


Re: Apache Spark 3.2 Expectation

2021-03-03 Thread Chang Chen
+1 for Data Source V2 Aggregate push down

huaxin gao  于2021年2月27日周六 上午4:20写道:

> Thanks Dongjoon and Xiao for the discussion. I would like to add Data
> Source V2 Aggregate push down to the list. I am currently working on
> JDBC Data Source V2 Aggregate push down, but the common code can be used
> for the file based V2 Data Source as well. For example, MAX and MIN can be
> pushed down to Parquet and Orc, since they can use statistics information
> to perform these operations efficiently. Quite a few users are
> interested in this Aggregate push down feature and the preliminary
> performance test for JDBC Aggregate push down is positive. So I think it is
> a valuable feature to add for Spark 3.2.
>
> Thanks,
> Huaxin
>
> On Fri, Feb 26, 2021 at 11:13 AM Xiao Li  wrote:
>
>> Thank you, Dongjoon, for initiating this discussion. Let us keep it open.
>> It might take 1-2 weeks to collect from the community all the features
>> we plan to build and ship in 3.2 since we just finished the 3.1 voting.
>>
>>
>>> 3. +100 for Apache Spark 3.2.0 in July 2021. Maybe, we need `branch-cut`
>>> in April because we took 3 month for Spark 3.1 release.
>>
>>
>> TBH, cutting the branch this April does not look good to me. That means,
>> we only have one month left for feature development of Spark 3.2. Do we
>> have enough features in the current master branch? If not, are we able to
>> finish major features we collected here? Do they have a timeline or project
>> plan?
>>
>> Xiao
>>
>> Dongjoon Hyun  于2021年2月26日周五 上午10:07写道:
>>
>>> Thank you, Mridul and Sean.
>>>
>>> 1. Yes, `2017` was a typo. Java 17 is scheduled September 2021. And, of
>>> course, it's a nice-to-have status. :)
>>>
>>> 2. `Push based shuffle and disaggregated shuffle`. Definitely. Thanks
>>> for sharing,
>>>
>>> 3. +100 for Apache Spark 3.2.0 in July 2021. Maybe, we need `branch-cut`
>>> in April because we took 3 month for Spark 3.1 release.
>>> Let's update our release roadmap of the Apache Spark website.
>>>
>>> > I'd roughly expect 3.2 in, say, July of this year, given the usual
>>> cadence. No reason it couldn't be a little sooner or later. There is
>>> already some good stuff in 3.2 and will be a good minor release in 5-6
>>> months.
>>>
>>> Bests,
>>> Dongjoon.
>>>
>>>
>>>
>>> On Thu, Feb 25, 2021 at 9:33 AM Sean Owen  wrote:
>>>
 I'd roughly expect 3.2 in, say, July of this year, given the usual
 cadence. No reason it couldn't be a little sooner or later. There is
 already some good stuff in 3.2 and will be a good minor release in 5-6
 months.

 On Thu, Feb 25, 2021 at 10:57 AM Dongjoon Hyun 
 wrote:

> Hi, All.
>
> Since we have been preparing Apache Spark 3.2.0 in master branch since
> December 2020, March seems to be a good time to share our thoughts and
> aspirations on Apache Spark 3.2.
>
> According to the progress on Apache Spark 3.1 release, Apache Spark
> 3.2 seems to be the last minor release of this year. Given the timeframe,
> we might consider the following. (This is a small set. Please add your
> thoughts to this limited list.)
>
> # Languages
>
> - Scala 2.13 Support: This was expected on 3.1 via SPARK-25075 but
> slipped out. Currently, we are trying to use Scala 2.13.5 via SPARK-34505
> and investigating the publishing issue. Thank you for your contributions
> and feedback on this.
>
> - Java 17 LTS Support: Java 17 LTS will arrive in September 2017. Like
> Java 11, we need lots of support from our dependencies. Let's see.
>
> - Python 3.6 Deprecation(?): Python 3.6 community support ends at
> 2021-12-23. So, the deprecation is not required yet, but we had better
> prepare it because we don't have an ETA of Apache Spark 3.3 in 2022.
>
> - SparkR CRAN publishing: As we know, it's discontinued so far.
> Resuming it depends on the success of Apache SparkR 3.1.1 CRAN publishing.
> If it succeeds to revive it, we can keep publishing. Otherwise, I believe
> we had better drop it from the releasing work item list officially.
>
> # Dependencies
>
> - Apache Hadoop 3.3.2: Hadoop 3.2.0 becomes the default Hadoop profile
> in Apache Spark 3.1. Currently, Spark master branch lives on Hadoop 
> 3.2.2's
> shaded clients via SPARK-33212. So far, there is one on-going report at
> YARN environment. We hope it will be fixed soon at Spark 3.2 timeframe and
> we can move toward Hadoop 3.3.2.
>
> - Apache Hive 2.3.9: Spark 3.0 starts to use Hive 2.3.7 by default
> instead of old Hive 1.2 fork. Spark 3.1 removed hive-1.2 profile 
> completely
> via SPARK-32981 and replaced the generated hive-service-rpc code with the
> official dependency via SPARK-32981. We are steadily improving this area
> and will consume Hive 2.3.9 if available.
>
> - K8s Client 4.13.2: During K8s GA activity, Spark 3.1 upgrades K8s
> client dependency to 

Support ZOrder in OSS

2021-01-09 Thread Chang Chen
Hi All

I found that impala already implemented Zorder (
https://issues.apache.org/jira/browse/IMPALA-8755).

I used to think supporting zorder needed file format support, but from the
impala implementation, it looks like  only needing to implement a new
RecordComparator which is independent with File format.

I know that DRB already supports it in 2018, I believe  this feature is
useful for improving query performance in certain cases.


Would it be possible to port the zorder implementation  from DBR to open
source spark?

Thanks
Chang


The progress of DataSourceV2 based connector for JDBC?

2020-12-25 Thread Chang Chen
Hi All

Is there any plan for supporting JDBC DataSourceV2?

I noticed this PR(https://github.com/apache/spark/pull/25211) but was
closed a year ago.

@Wenchen Fan  already implemented some basic catalog
functionality, so we can using datasource v2 by SQL, for example:

select * from h2.db.table.

But this is inconvenient  for  case where we have to use API, and let up
level pass the parameters to create concrete Relation.


Re: AWS Consistent S3 & Apache Hadoop's S3A connector

2020-12-06 Thread Chang Chen
Since S3A now works perfectly with S3Guard turned off, Could Magic
Committor work with S3Guard is off? If Yes, will performance degenerate? Or
if HADOOP-17400 is fixed, then it will have comparable performance?

Steve Loughran  于2020年12月4日周五 下午10:00写道:

> as sent to hadoop-general.
>
> TL;DR. S3 is consistent; S3A now works perfectly with S3Guard turned off,
> if not, file a JIRA.  rename still isn't real, so don't rely on that or
> create(path, overwrite=false) for atomic operations
>
> ---
>
> If you've missed the announcement, AWS S3 storage is now strongly
> consistent: https://aws.amazon.com/s3/consistency/
>
> That's full CRUD consistency, consistent listing, and no 404 caching.
>
> You don't get: rename, or an atomic create-no-overwrite. Applications need
> to know that and code for it.
>
> This is enabled for all S3 buckets; no need to change endpoints or any
> other settings. No extra cost, no performance impact. This is the biggest
> change in S3 semantics since it launched.
>
> What does this mean for the Hadoop S3A connector?
>
>
>1. We've been testing it for a while, no problems have surfaced.
>2. There's no need for S3Guard; leave the default settings alone. If
>you were using it, turn it off, restart *everything* and then you can
>delete the DDB table.
>3. Without S3 listings may get a bit slower.
>4. There's been a lot of work in branch-3.3 on speeding up listings
>against raw S3, especially for code which uses listStatusIterator() and
>listFiles (HADOOP-17400).
>
>
> It'll be time to get Hadoop 3.3.1 out the door for people to play with;
> it's got a fair few other s3a-side enhancements.
>
> People are still using S3Guard and it needs to be maintained for now, but
> we'll have to be fairly ruthless about what isn't going to get closed as
> WONTFIX. I'm worried here about anyone using S3Guard against non-AWS
> consistent stores. If you are, send me an email.
>
> And so for releases/PRs, tdoing est runs with and without S3Guard is
> important. I've added an optional backwards-incompatible change recently
> for better scalability: HADOOP-13230. S3A to optionally retain directory
> markers. which adds markers=keep/delete to the test matrix. This is a pain,
> though as you can choose two options at a time it's manageable.
>
> Apache HBase
> 
>
> You still need the HBoss extension in front of the S3A connector to use
> Zookeeper to lock files during compaction.
>
>
> Apache Spark
> 
>
> Any workflows which chained together reads directly after
> writes/overwrites of files should now work reliably with raw S3.
>
>
>- The classic FileOutputCommitter commit-by-rename algorithms aren't
>going to fail with FileNotFoundException during task commit.
>- They will still use copy to rename work, so take O(data) time to
>commit filesWithout atomic dir rename, v1 commit algorithm can't isolate
>the commit operations of two task attempts. So it's unsafe and very slow.
>- The v2 commit is slow, doesn't have isolation between task attempt
>commits against any filesystem.If different task attempts are generating
>unique filenames (possibly to work around s3 update inconsistencies), it's
>not safe. Turn that option off.
>- The S3A committers' algorithms are happy talking directly to S3.
>But: SPARK-33402 is needed to fix a race condition in the staging
>committer.
>- The "Magic" committer, which has relied on a consistent store, is
>safe. There's a fix in HADOOP-17318 for the staging committer; hadoop-aws
>builds with that in will work safely with older spark versions.
>
>
> Any formats which commit work by writing a file with a unique name &
> updating a reference to it in a consistent store (iceberg ) are still
> going to work great. Naming is irrelevant and commit-by-writing-a-file is
> S3's best story.
>
> (+ SPARK-33135 and other uses of incremental listing will get the benefits
> of async prefetching of the next page of list results)
>
> Disctp
> ==
>
> There'll be no cached 404s to break uploads, even if you don't have the
> relevant fixes to stop HEAD requests before creating files (HADOOP-16932
> and revert of HADOOP-8143)or update inconsistency (HADOOP-16775)
>
>- If your distcp version supports -direct, use it to avoid rename
>performance penaltiesIf your distcp version doesn't have HADOOP-15209 it
>can issue needless DELETE calls to S3 after a big update, and end up being
>throttled badly. Upgrade if you can.
>- If people are seeing problems: issues.apache.org + component HADOOP
>is where to file JIRAs; please tag the version of hadoop libraries you've
>been running with.
>
>
> thanks,
>
> -Steve
>


Re: [DISCUSS][SPIP] Standardize Spark Exception Messages

2020-10-27 Thread Chang Chen
hi Xinyi

Just curious, which tool did you use to generate this



Xinyi Yu  于2020年10月26日周一 上午8:05写道:

> Hi all,
>
> We like to post a SPIP of Standardize Exception Messages in Spark. Here is
> the document link:
>
> https://docs.google.com/document/d/1XGj1o3xAFh8BA7RCn3DtwIPC6--hIFOaNUNSlpaOIZs/edit?usp=sharing
> <
> https://docs.google.com/document/d/1XGj1o3xAFh8BA7RCn3DtwIPC6--hIFOaNUNSlpaOIZs/edit?usp=sharing>
>
>
> This SPIP aims to standardize the exception messages in Spark. It has three
> major focuses:
> 1. Group exception messages in dedicated files for easy maintenance and
> auditing.
> 2. Establish an error message guideline for developers.
> 3. Improve error message quality.
>
> Thanks for your time and patience. Looking forward to your feedback!
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Re: Performance of VectorizedRleValuesReader

2020-09-14 Thread Chang Chen
I See.

In our case,  we use SingleBufferInputStream, so time spent is duplicating
the backing byte buffer.


Thanks
Chang


Ryan Blue  于2020年9月15日周二 上午2:04写道:

> Before, the input was a byte array so we could read from it directly. Now,
> the input is a `ByteBufferInputStream` so that Parquet can choose how to
> allocate buffers. For example, we use vectored reads from S3 that pull back
> multiple buffers in parallel.
>
> Now that the input is a stream based on possibly multiple byte buffers, it
> provides a method to get a buffer of a certain length. In most cases, that
> will create a ByteBuffer with the same backing byte array, but it may need
> to copy if the request spans multiple buffers in the stream. Most of the
> time, the call to `slice` only requires duplicating the buffer and setting
> its limit, but a read that spans multiple buffers is expensive. It would be
> helpful to know whether the time spent is copying data, which would
> indicate the backing buffers are too small, or whether it is spent
> duplicating the backing byte buffer.
>
> On Mon, Sep 14, 2020 at 5:29 AM Sean Owen  wrote:
>
>> Ryan do you happen to have any opinion there? that particular section
>> was introduced in the Parquet 1.10 update:
>>
>> https://github.com/apache/spark/commit/cac9b1dea1bb44fa42abf77829c05bf93f70cf20
>> It looks like it didn't use to make a ByteBuffer each time, but read from
>> in.
>>
>> On Sun, Sep 13, 2020 at 10:48 PM Chang Chen  wrote:
>> >
>> > I think we can copy all encoded data into a ByteBuffer once, and unpack
>> values in the loop
>> >
>> >  while (valueIndex < this.currentCount) {
>> > // values are bit packed 8 at a time, so reading bitWidth will
>> always work
>> > this.packer.unpack8Values(buffer, buffer.position() + valueIndex,
>> this.currentBuffer, valueIndex);
>> > valueIndex += 8;
>> >   }
>> >
>> > Sean Owen  于2020年9月14日周一 上午10:40写道:
>> >>
>> >> It certainly can't be called once - it's reading different data each
>> time.
>> >> There might be a faster way to do it, I don't know. Do you have ideas?
>> >>
>> >> On Sun, Sep 13, 2020 at 9:25 PM Chang Chen 
>> wrote:
>> >> >
>> >> > Hi export
>> >> >
>> >> > it looks like there is a hot spot in
>> VectorizedRleValuesReader#readNextGroup()
>> >> >
>> >> > case PACKED:
>> >> >   int numGroups = header >>> 1;
>> >> >   this.currentCount = numGroups * 8;
>> >> >
>> >> >   if (this.currentBuffer.length < this.currentCount) {
>> >> > this.currentBuffer = new int[this.currentCount];
>> >> >   }
>> >> >   currentBufferIdx = 0;
>> >> >   int valueIndex = 0;
>> >> >   while (valueIndex < this.currentCount) {
>> >> > // values are bit packed 8 at a time, so reading bitWidth will
>> always work
>> >> > ByteBuffer buffer = in.slice(bitWidth);
>> >> > this.packer.unpack8Values(buffer, buffer.position(),
>> this.currentBuffer, valueIndex);
>> >> > valueIndex += 8;
>> >> >   }
>> >> >
>> >> >
>> >> > Per my profile, the codes will spend 30% time of readNextGrou() on
>> slice , why we can't call slice out of the loop?
>>
>
>
> --
> Ryan Blue
>


Re: Performance of VectorizedRleValuesReader

2020-09-13 Thread Chang Chen
I think we can copy all encoded data into a ByteBuffer once, and unpack
values in the loop

 while (valueIndex < this.currentCount) {
// values are bit packed 8 at a time, so reading bitWidth will always
work
this.packer.unpack8Values(buffer, buffer.position() + valueIndex,
this.currentBuffer, valueIndex);
valueIndex += 8;
  }

Sean Owen  于2020年9月14日周一 上午10:40写道:

> It certainly can't be called once - it's reading different data each time.
> There might be a faster way to do it, I don't know. Do you have ideas?
>
> On Sun, Sep 13, 2020 at 9:25 PM Chang Chen  wrote:
> >
> > Hi export
> >
> > it looks like there is a hot spot in
> VectorizedRleValuesReader#readNextGroup()
> >
> > case PACKED:
> >   int numGroups = header >>> 1;
> >   this.currentCount = numGroups * 8;
> >
> >   if (this.currentBuffer.length < this.currentCount) {
> > this.currentBuffer = new int[this.currentCount];
> >   }
> >   currentBufferIdx = 0;
> >   int valueIndex = 0;
> >   while (valueIndex < this.currentCount) {
> > // values are bit packed 8 at a time, so reading bitWidth will
> always work
> > ByteBuffer buffer = in.slice(bitWidth);
> > this.packer.unpack8Values(buffer, buffer.position(),
> this.currentBuffer, valueIndex);
> > valueIndex += 8;
> >   }
> >
> >
> > Per my profile, the codes will spend 30% time of readNextGrou() on slice
> , why we can't call slice out of the loop?
>


Performance of VectorizedRleValuesReader

2020-09-13 Thread Chang Chen
Hi export

it looks like there is a hot spot in VectorizedRleValuesReader#readNextGroup
()

case PACKED:
  int numGroups = header >>> 1;
  this.currentCount = numGroups * 8;

  if (this.currentBuffer.length < this.currentCount) {
this.currentBuffer = new int[this.currentCount];
  }
  currentBufferIdx = 0;
  int valueIndex = 0;
  while (valueIndex < this.currentCount) {
// values are bit packed 8 at a time, so reading bitWidth will always work
ByteBuffer buffer = in.slice(bitWidth);
this.packer.unpack8Values(buffer, buffer.position(),
this.currentBuffer, valueIndex);
valueIndex += 8;
  }


Per my profile, the codes will spend 30% time of readNextGrou() on slice ,
why we can't call slice out of the loop?


[DISCUSS][SQL] Improve Performance of AggregationIterator

2020-07-28 Thread Chang Chen
Hi Spark Developers

We are implementing a new TypedImperativeAggregate which will benefit from
batch to batch update or merge. And at least, in the Sort based
aggregation,  we can process inputs batch to batch.

Does anyone do the same optimization?


[DISCUSS] Caching SparkPlan

2020-01-31 Thread Chang Chen
I'd like to start a discussion on caching SparkPlan

>From what I benchmark, if sql execution time is less than 1 second, then we
cannot ignore the following overheads , especially if we cache data in
memory

   1. Paring, analysing, optimizing SQL
   2. Generating Physical Plan (SparkPlan)
   3. Generating Codes
   4. Generating RDD

snappydata(https://github.com/SnappyDataInc/snappydata) already implemented
plan cache for same query pattern, for example ,given the sql: SELECT
intField, count(*), sum(intField) FROM t WHERE intField = 1 group by
intField, for any sql which is only difference on Literal, we can re-use
the same SparkPlan. snappydata's implementation is based on caching the
final RDD graph, since spark will cache shuffle data internally, I believe
it can not run the same  query pattern concurrently.

My idea is caching SparkPlan(and hence, caching generated codes), and
creating RDD graph every time(the only overhead we can not ignore, but if
we read data from memory, the overhead is OK). I did some experiment by
extracting ParamLiteral from snappydata, and the experiment looks fine.

I want to get some comments before writing a jira.  Would appreciate
comments and discussions from the community.

Thanks
Chang


Re: Is RDD thread safe?

2019-11-25 Thread Chang Chen
Thank you Imran

I will check whether there is memory waste or not

Imran Rashid  于2019年11月26日周二 上午1:30写道:

> I think Chang is right, but I also think this only comes up in limited
> scenarios.  I initially thought it wasn't a bug, but after some more
> thought I have some concerns in light of the issues we've had w/
> nondeterministic RDDs, eg. repartition().
>
> Say I have code like this:
>
> val cachedRDD = sc.textFile(...).cache()
> (0 until 200).par.foreach { idx => cachedRDD.doSomeAction(idx) }
>
> that is, my cached rdd is referenced by many threads concurrently before
> the RDD has been cached.
>
> When one of those tasks gets to cachedRDD.getOrCompute(), there are a few
> possible scenarios:
>
> 1) the partition has never been referenced before.
> BlockManager.getOrCompute() will say the block doesn't exist, so it will
> get recomputed (
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L360
> )
>
> 2) The partition has been fully materialized by another task, the
> blockmanagermaster on the driver already knows about it, so
> BlockManager.getOrCompute() will return a pointer to the cached block
> (perhaps on another node)
>
> 3) The partition is actively being computed by another task on the same
> executor.  Then BlockManager.getOrCompute() will not know about that other
> version of the task (it only knows about blocks that are fully
> materialized, IIUC).  But eventually, when the tasks try to actually write
> the data, they'll try to get a write lock for the block:
> https://github.com/apache/spark/blob/f09c1a36c4b0ca1fb450e274b22294dca590d8f8/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1218
> one task will get the write lock first; the other task will block on the
> other task, and then realize the block exists and just return those values.
>
> 4) The partition is actively being compute by another task on a
> *different* executor.  IIUC, Spark doesn't try to do anything to prevent
> both tasks from computing the block themselves in this case.  (To do so
> would require extra coordination in driver before writing every single
> block.)  Those locks in BlockManager and BlockInfoManager don't stop this
> case, because this is happening in entirely independent JVMs.
> There normally won't be any problem here -- if the RDD is totally
> deterministic, then you'll just end up with an extra copy of the data.  In
> a way, this is good, the cached RDD is in high demand, so having an extra
> copy isn't so bad.
> OTOH, if the RDD is non-deterministic, you've now got two copies with
> different values.  Then again, RDD cache is not resilient in general, so
> you've always got to be able to handle an RDD getting recomputed if its
> evicted from the cache.  So this should be pretty similar.
>
> On Mon, Nov 25, 2019 at 2:29 AM Weichen Xu 
> wrote:
>
>> emmm, I haven't check code, but I think if an RDD is referenced in
>> several places, the correct behavior should be: when this RDD data is
>> needed, it will be computed and then cached only once, otherwise it should
>> be treated as a bug. If you are suspicious there's a race condition, you
>> could create a jira ticket.
>>
>> On Mon, Nov 25, 2019 at 12:21 PM Chang Chen  wrote:
>>
>>> Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
>>> cached data?
>>>
>>> See codes from BlockManager
>>>
>>> def getOrElseUpdate(...)   = {
>>>   get[T](blockId)(classTag) match {
>>>case ...
>>>case _ =>  // 1. no data is
>>> cached.
>>> // Need to compute the block
>>>  }
>>>  // Initially we hold no locks on this block
>>>  doPutIterator(...) match{..}
>>> }
>>>
>>> Considering  two DAGs (contain the same cached RDD ) runs
>>> simultaneously,  if both returns none  when they get same block from
>>> BlockManager(i.e. #1 above), then I guess the same data would be cached
>>> twice.
>>>
>>> If the later cache could override the previous data, and no memory is
>>> waste, then this is OK
>>>
>>> Thanks
>>> Chang
>>>
>>>
>>> Weichen Xu  于2019年11月25日周一 上午11:52写道:
>>>
>>>> Rdd id is immutable and when rdd object created, the rdd id is
>>>> generated. So why there is race condition in "rdd id" ?
>>>>
>>>> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen 
>>>> wrote:
>>>>
>>>>> I am wonder the concurrent semantics for reason about the correctness.
>>>>

Re: Is RDD thread safe?

2019-11-24 Thread Chang Chen
Sorry I did't describe clearly,  RDD id itself is thread-safe, how about
cached data?

See codes from BlockManager

def getOrElseUpdate(...)   = {
  get[T](blockId)(classTag) match {
   case ...
   case _ =>  // 1. no data is cached.
// Need to compute the block
 }
 // Initially we hold no locks on this block
 doPutIterator(...) match{..}
}

Considering  two DAGs (contain the same cached RDD ) runs simultaneously,
if both returns none  when they get same block from BlockManager(i.e. #1
above), then I guess the same data would be cached twice.

If the later cache could override the previous data, and no memory is
waste, then this is OK

Thanks
Chang


Weichen Xu  于2019年11月25日周一 上午11:52写道:

> Rdd id is immutable and when rdd object created, the rdd id is generated.
> So why there is race condition in "rdd id" ?
>
> On Mon, Nov 25, 2019 at 11:31 AM Chang Chen  wrote:
>
>> I am wonder the concurrent semantics for reason about the correctness. If
>> the two query simultaneously run the DAGs which use the same cached
>> DF\RDD,but before cache data actually happen, what will happen?
>>
>> By looking into code a litter, I suspect they have different BlockID for
>> same Dataset which is unexpected behavior, but there is no race condition.
>>
>> However RDD id is not lazy, so there is race condition.
>>
>> Thanks
>> Chang
>>
>>
>> Weichen Xu  于2019年11月12日周二 下午1:22写道:
>>
>>> Hi Chang,
>>>
>>> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>>>
>>> Thanks!
>>>
>>> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen 
>>> wrote:
>>>
>>>> Hi all
>>>>
>>>> I meet a case where I need cache a source RDD, and then create
>>>> different DataFrame from it in different threads to accelerate query.
>>>>
>>>> I know that SparkSession is thread safe(
>>>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
>>>> whether RDD  is thread safe or not
>>>>
>>>> Thanks
>>>>
>>>


Re: Is RDD thread safe?

2019-11-24 Thread Chang Chen
I am wonder the concurrent semantics for reason about the correctness. If
the two query simultaneously run the DAGs which use the same cached
DF\RDD,but before cache data actually happen, what will happen?

By looking into code a litter, I suspect they have different BlockID for
same Dataset which is unexpected behavior, but there is no race condition.

However RDD id is not lazy, so there is race condition.

Thanks
Chang


Weichen Xu  于2019年11月12日周二 下午1:22写道:

> Hi Chang,
>
> RDD/Dataframe is immutable and lazy computed. They are thread safe.
>
> Thanks!
>
> On Tue, Nov 12, 2019 at 12:31 PM Chang Chen  wrote:
>
>> Hi all
>>
>> I meet a case where I need cache a source RDD, and then create different
>> DataFrame from it in different threads to accelerate query.
>>
>> I know that SparkSession is thread safe(
>> https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
>> whether RDD  is thread safe or not
>>
>> Thanks
>>
>


Is RDD thread safe?

2019-11-11 Thread Chang Chen
Hi all

I meet a case where I need cache a source RDD, and then create different
DataFrame from it in different threads to accelerate query.

I know that SparkSession is thread safe(
https://issues.apache.org/jira/browse/SPARK-15135), but i am not sure
whether RDD  is thread safe or not

Thanks


Re: How to print plan of Structured Streaming DataFrame

2017-11-21 Thread Chang Chen
Yes, I switched to query.explain, though it did't fail,  I still can't make
it print executed plan, is there any example?

I first try to placed query.explain() exactly after start, but spark
reports no data.

Hera are my codes:

while (query.awaitTermination(2)) {  // 1
  query.explain()   // 2
}
query.explain()// 3

The interesting part is  query.awaitTermination(2) will terminate
stream engine, so query.explain() at line 2 never be executed.


On Tue, Nov 21, 2017 at 1:49 PM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> wordCounts.explain() -> query.explain()?
>
>
> Chang Chen wrote
> > Hi Guys
> >
> > I modified StructuredNetworkWordCount to see what the executed plan is,
> > here are my codes:
> >
> > val wordCounts = words.groupBy("value").count()
> >
> > // Start running the query that prints the running counts to the console
> > val query = wordCounts.writeStream
> >   .outputMode("complete")
> >   .format("console")
> >   .start()
> >
> > wordCounts.explain()  // additional codes
> >
> >
> > But it failed with “AnalysisException: Queries with streaming sources
> must
> > be executed with writeStream.start()”?
> >
> >
> > Thanks
> > Chang
>
>
>
>
>
> -
> Liang-Chi Hsieh | @viirya
> Spark Technology Center
> http://www.spark.tc/
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


How to print plan of Structured Streaming DataFrame

2017-11-20 Thread Chang Chen
Hi Guys

I modified StructuredNetworkWordCount to see what the executed plan is,
here are my codes:

val wordCounts = words.groupBy("value").count()

// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

wordCounts.explain()  // additional codes


But it failed with “AnalysisException: Queries with streaming sources must
be executed with writeStream.start()”?


Thanks
Chang


Question on HashJoin trait

2017-07-26 Thread Chang Chen
Hi

I am reading Spark SQL codes,  what do streamedPlan and buildPlan
of HashJoin trait for?

protected lazy val (buildPlan, streamedPlan) = buildSide match {
  case BuildLeft => (left, right)
  case BuildRight => (right, left)
}


https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L59

Thanks
Chang


Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
Sorry, I didn't express clearly.  I think the evaluation order doesn't
matter in the context of join implementation(sort or hash based). it should
only refer to join key.


Thanks
Chang

On Tue, Jul 18, 2017 at 7:57 AM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Evaluation order does matter. A non-deterministic expression can change its
> output due to internal state which may depend on input order.
>
> MonotonicallyIncreasingID is an example for the stateful expression. Once
> you change the row order, the evaluation results are different.
>
>
>
> Chang Chen wrote
> > I see.
> >
> > Actually, it isn't about evaluation order which user can't specify. It's
> > about how many times we evaluate the non-deterministic expression for the
> > same row.
> >
> > For example, given the SQL:
> >
> > SELECT a.col1
> > FROM tbl1 a
> > LEFT OUTER JOIN tbl2 b
> > ON
> >  CASE WHEN a.col2 IS NULL TNEN cast(rand(9)*1000 - 99 as string)
> > ELSE a.col2 END
> > =
> >  CASE WHEN b.col3 IS NULL TNEN cast(rand(9)*1000 - 99 as string)
> > ELSE b.col3 END;
> >
> > I think if we exactly evaluate   join key one time for each row of a and
> b
> > in the whole pipeline, even if the result isn't deterministic, but the
> > computation is correct.
> >
> > Thanks
> > Chang
> >
> >
> > On Mon, Jul 17, 2017 at 10:49 PM, Liang-Chi Hsieh 
>
> > viirya@
>
> >  wrote:
> >
> >>
> >> IIUC, the evaluation order of rows in Join can be different in different
> >> physical operators, e.g., Sort-based and Hash-based.
> >>
> >> But for non-deterministic expressions, different evaluation orders
> change
> >> results.
> >>
> >>
> >>
> >> Chang Chen wrote
> >> > I see the issue. I will try https://github.com/apache/
> spark/pull/18652,
> >> I
> >> > think
> >> >
> >> > 1 For Join Operator, the left and right plan can't be
> >> non-deterministic.
> >> > 2 If  Filter can support non-deterministic, why not join condition?
> >> > 3 We can't push down or project non-deterministic expression, since it
> >> may
> >> > change semantics.
> >> >
> >> > Actually, the real problem is #2. If the join condition could be
> >> > non-deterministic, then we needn't insert project.
> >> >
> >> > Thanks
> >> > Chang
> >> >
> >> >
> >> >
> >> >
> >> > On Mon, Jul 17, 2017 at 3:59 PM, 蒋星博 
> >>
> >> > jiangxb1987@
> >>
> >> >  wrote:
> >> >
> >> >> FYI there have been a related discussion here:
> >> https://github.com/apache/
> >> >> spark/pull/15417#discussion_r85295977
> >> >>
> >> >> 2017-07-17 15:44 GMT+08:00 Chang Chen 
> >>
> >> > baibaichen@
> >>
> >> > :
> >> >>
> >> >>> Hi All
> >> >>>
> >> >>> I don't understand the difference between the semantics, I found
> >> Spark
> >> >>> does the same thing for GroupBy non-deterministic. From Map-Reduce
> >> point
> >> >>> of
> >> >>> view, Join is also GroupBy in essence .
> >> >>>
> >> >>> @Liang Chi Hsieh
> >> >>> https://plus.google.com/u/0/103179362592085650735?prsrc=4;
> >> >>>
> >> >>> in which situation,  semantics  will be changed?
> >> >>>
> >> >>> Thanks
> >> >>> Chang
> >> >>>
> >> >>> On Mon, Jul 17, 2017 at 3:29 PM, Liang-Chi Hsieh 
> >>
> >> > viirya@
> >>
> >> > 
> >> >>> wrote:
> >> >>>
> >> >>>>
> >> >>>> Thinking about it more, I think it changes the semantics only under
> >> >>>> certain
> >> >>>> scenarios.
> >> >>>>
> >> >>>> For the example SQL query shown in previous discussion, it looks
> the
> >> >>>> same
> >> >>>> semantics.
> >> >>>>
> >> >>>>
> >> >>>> Xiao Li wrote
> >> >>>> > If the join condition is non-deterministic, pushing it down to
> the
> >> >>>> > underlying proj

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
I see.

Actually, it isn't about evaluation order which user can't specify. It's
about how many times we evaluate the non-deterministic expression for the
same row.

For example, given the SQL:

SELECT a.col1
FROM tbl1 a
LEFT OUTER JOIN tbl2 b
ON
 CASE WHEN a.col2 IS NULL TNEN cast(rand(9)*1000 - 99 as string)
ELSE a.col2 END
=
 CASE WHEN b.col3 IS NULL TNEN cast(rand(9)*1000 - 99 as string)
ELSE b.col3 END;

I think if we exactly evaluate   join key one time for each row of a and b
in the whole pipeline, even if the result isn't deterministic, but the
computation is correct.

Thanks
Chang


On Mon, Jul 17, 2017 at 10:49 PM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> IIUC, the evaluation order of rows in Join can be different in different
> physical operators, e.g., Sort-based and Hash-based.
>
> But for non-deterministic expressions, different evaluation orders change
> results.
>
>
>
> Chang Chen wrote
> > I see the issue. I will try https://github.com/apache/spark/pull/18652,
> I
> > think
> >
> > 1 For Join Operator, the left and right plan can't be non-deterministic.
> > 2 If  Filter can support non-deterministic, why not join condition?
> > 3 We can't push down or project non-deterministic expression, since it
> may
> > change semantics.
> >
> > Actually, the real problem is #2. If the join condition could be
> > non-deterministic, then we needn't insert project.
> >
> > Thanks
> > Chang
> >
> >
> >
> >
> > On Mon, Jul 17, 2017 at 3:59 PM, 蒋星博 
>
> > jiangxb1987@
>
> >  wrote:
> >
> >> FYI there have been a related discussion here:
> https://github.com/apache/
> >> spark/pull/15417#discussion_r85295977
> >>
> >> 2017-07-17 15:44 GMT+08:00 Chang Chen 
>
> > baibaichen@
>
> > :
> >>
> >>> Hi All
> >>>
> >>> I don't understand the difference between the semantics, I found Spark
> >>> does the same thing for GroupBy non-deterministic. From Map-Reduce
> point
> >>> of
> >>> view, Join is also GroupBy in essence .
> >>>
> >>> @Liang Chi Hsieh
> >>> https://plus.google.com/u/0/103179362592085650735?prsrc=4;
> >>>
> >>> in which situation,  semantics  will be changed?
> >>>
> >>> Thanks
> >>> Chang
> >>>
> >>> On Mon, Jul 17, 2017 at 3:29 PM, Liang-Chi Hsieh 
>
> > viirya@
>
> > 
> >>> wrote:
> >>>
> >>>>
> >>>> Thinking about it more, I think it changes the semantics only under
> >>>> certain
> >>>> scenarios.
> >>>>
> >>>> For the example SQL query shown in previous discussion, it looks the
> >>>> same
> >>>> semantics.
> >>>>
> >>>>
> >>>> Xiao Li wrote
> >>>> > If the join condition is non-deterministic, pushing it down to the
> >>>> > underlying project will change the semantics. Thus, we are unable to
> >>>> do it
> >>>> > in PullOutNondeterministic. Users can do it manually if they do not
> >>>> care
> >>>> > the semantics difference.
> >>>> >
> >>>> > Thanks,
> >>>> >
> >>>> > Xiao
> >>>> >
> >>>> >
> >>>> >
> >>>> > 2017-07-16 20:07 GMT-07:00 Chang Chen 
> >>>>
> >>>> > baibaichen@
> >>>>
> >>>> > :
> >>>> >
> >>>> >> It is tedious since we have lots of Hive SQL being migrated to
> >>>> Spark.
> >>>> >> And
> >>>> >> this workaround is equivalent  to insert a Project between Join
> >>>> operator
> >>>> >> and its child.
> >>>> >>
> >>>> >> Why not do it in PullOutNondeterministic?
> >>>> >>
> >>>> >> Thanks
> >>>> >> Chang
> >>>> >>
> >>>> >>
> >>>> >> On Fri, Jul 14, 2017 at 5:29 PM, Liang-Chi Hsieh 
> >>>>
> >>>> > viirya@
> >>>>
> >>>> >  wrote:
> >>>> >>
> >>>> >>>
> >>>> >>> A possible workaround is to add the rand column into tbl1 with a
> >>>> >>> projection
&g

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
I see the issue. I will try https://github.com/apache/spark/pull/18652, I
think

1 For Join Operator, the left and right plan can't be non-deterministic.
2 If  Filter can support non-deterministic, why not join condition?
3 We can't push down or project non-deterministic expression, since it may
change semantics.

Actually, the real problem is #2. If the join condition could be
non-deterministic, then we needn't insert project.

Thanks
Chang




On Mon, Jul 17, 2017 at 3:59 PM, 蒋星博 <jiangxb1...@gmail.com> wrote:

> FYI there have been a related discussion here: https://github.com/apache/
> spark/pull/15417#discussion_r85295977
>
> 2017-07-17 15:44 GMT+08:00 Chang Chen <baibaic...@gmail.com>:
>
>> Hi All
>>
>> I don't understand the difference between the semantics, I found Spark
>> does the same thing for GroupBy non-deterministic. From Map-Reduce point of
>> view, Join is also GroupBy in essence .
>>
>> @Liang Chi Hsieh
>> <https://plus.google.com/u/0/103179362592085650735?prsrc=4>
>>
>> in which situation,  semantics  will be changed?
>>
>> Thanks
>> Chang
>>
>> On Mon, Jul 17, 2017 at 3:29 PM, Liang-Chi Hsieh <vii...@gmail.com>
>> wrote:
>>
>>>
>>> Thinking about it more, I think it changes the semantics only under
>>> certain
>>> scenarios.
>>>
>>> For the example SQL query shown in previous discussion, it looks the same
>>> semantics.
>>>
>>>
>>> Xiao Li wrote
>>> > If the join condition is non-deterministic, pushing it down to the
>>> > underlying project will change the semantics. Thus, we are unable to
>>> do it
>>> > in PullOutNondeterministic. Users can do it manually if they do not
>>> care
>>> > the semantics difference.
>>> >
>>> > Thanks,
>>> >
>>> > Xiao
>>> >
>>> >
>>> >
>>> > 2017-07-16 20:07 GMT-07:00 Chang Chen 
>>>
>>> > baibaichen@
>>>
>>> > :
>>> >
>>> >> It is tedious since we have lots of Hive SQL being migrated to Spark.
>>> >> And
>>> >> this workaround is equivalent  to insert a Project between Join
>>> operator
>>> >> and its child.
>>> >>
>>> >> Why not do it in PullOutNondeterministic?
>>> >>
>>> >> Thanks
>>> >> Chang
>>> >>
>>> >>
>>> >> On Fri, Jul 14, 2017 at 5:29 PM, Liang-Chi Hsieh 
>>>
>>> > viirya@
>>>
>>> >  wrote:
>>> >>
>>> >>>
>>> >>> A possible workaround is to add the rand column into tbl1 with a
>>> >>> projection
>>> >>> before the join.
>>> >>>
>>> >>> SELECT a.col1
>>> >>> FROM (
>>> >>>   SELECT col1,
>>> >>> CASE
>>> >>>  WHEN col2 IS NULL
>>> >>>THEN cast(rand(9)*1000 - 99 as string)
>>> >>>  ELSE
>>> >>>col2
>>> >>> END AS col2
>>> >>> FROM tbl1) a
>>> >>> LEFT OUTER JOIN tbl2 b
>>> >>> ON a.col2 = b.col3;
>>> >>>
>>> >>>
>>> >>>
>>> >>> Chang Chen wrote
>>> >>> > Hi Wenchen
>>> >>> >
>>> >>> > Yes. We also find this error is caused by Rand. However, this is
>>> >>> classic
>>> >>> > way to solve data skew in Hive.  Is there any equivalent way in
>>> Spark?
>>> >>> >
>>> >>> > Thanks
>>> >>> > Chang
>>> >>> >
>>> >>> > On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan 
>>> >>>
>>> >>> > cloud0fan@
>>> >>>
>>> >>> >  wrote:
>>> >>> >
>>> >>> >> It’s not about case when, but about rand(). Non-deterministic
>>> >>> expressions
>>> >>> >> are not allowed in join condition.
>>> >>> >>
>>> >>> >> > On 13 Jul 2017, at 6:43 PM, wangshuang 
>>> >>>
>>> >>> > cn_wss@
>>> >>>
>>> >>> >  wrote:
>>> >>&

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-17 Thread Chang Chen
Hi All

I don't understand the difference between the semantics, I found Spark does
the same thing for GroupBy non-deterministic. From Map-Reduce point of
view, Join is also GroupBy in essence .

@Liang Chi Hsieh <https://plus.google.com/u/0/103179362592085650735?prsrc=4>

in which situation,  semantics  will be changed?

Thanks
Chang

On Mon, Jul 17, 2017 at 3:29 PM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> Thinking about it more, I think it changes the semantics only under certain
> scenarios.
>
> For the example SQL query shown in previous discussion, it looks the same
> semantics.
>
>
> Xiao Li wrote
> > If the join condition is non-deterministic, pushing it down to the
> > underlying project will change the semantics. Thus, we are unable to do
> it
> > in PullOutNondeterministic. Users can do it manually if they do not care
> > the semantics difference.
> >
> > Thanks,
> >
> > Xiao
> >
> >
> >
> > 2017-07-16 20:07 GMT-07:00 Chang Chen 
>
> > baibaichen@
>
> > :
> >
> >> It is tedious since we have lots of Hive SQL being migrated to Spark.
> >> And
> >> this workaround is equivalent  to insert a Project between Join operator
> >> and its child.
> >>
> >> Why not do it in PullOutNondeterministic?
> >>
> >> Thanks
> >> Chang
> >>
> >>
> >> On Fri, Jul 14, 2017 at 5:29 PM, Liang-Chi Hsieh 
>
> > viirya@
>
> >  wrote:
> >>
> >>>
> >>> A possible workaround is to add the rand column into tbl1 with a
> >>> projection
> >>> before the join.
> >>>
> >>> SELECT a.col1
> >>> FROM (
> >>>   SELECT col1,
> >>> CASE
> >>>  WHEN col2 IS NULL
> >>>THEN cast(rand(9)*1000 - 99 as string)
> >>>  ELSE
> >>>col2
> >>> END AS col2
> >>> FROM tbl1) a
> >>> LEFT OUTER JOIN tbl2 b
> >>> ON a.col2 = b.col3;
> >>>
> >>>
> >>>
> >>> Chang Chen wrote
> >>> > Hi Wenchen
> >>> >
> >>> > Yes. We also find this error is caused by Rand. However, this is
> >>> classic
> >>> > way to solve data skew in Hive.  Is there any equivalent way in
> Spark?
> >>> >
> >>> > Thanks
> >>> > Chang
> >>> >
> >>> > On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan 
> >>>
> >>> > cloud0fan@
> >>>
> >>> >  wrote:
> >>> >
> >>> >> It’s not about case when, but about rand(). Non-deterministic
> >>> expressions
> >>> >> are not allowed in join condition.
> >>> >>
> >>> >> > On 13 Jul 2017, at 6:43 PM, wangshuang 
> >>>
> >>> > cn_wss@
> >>>
> >>> >  wrote:
> >>> >> >
> >>> >> > I'm trying to execute hive sql on spark sql (Also on spark
> >>> >> thriftserver), For
> >>> >> > optimizing data skew, we use "case when" to handle null.
> >>> >> > Simple sql as following:
> >>> >> >
> >>> >> >
> >>> >> > SELECT a.col1
> >>> >> > FROM tbl1 a
> >>> >> > LEFT OUTER JOIN tbl2 b
> >>> >> > ON
> >>> >> > * CASE
> >>> >> >   WHEN a.col2 IS NULL
> >>> >> >   TNEN cast(rand(9)*1000 - 99 as
> >>> string)
> >>> >> >   ELSE
> >>> >> >   a.col2 END *
> >>> >> >   = b.col3;
> >>> >> >
> >>> >> >
> >>> >> > But I get the error:
> >>> >> >
> >>> >> > == Physical Plan ==
> >>> >> > *org.apache.spark.sql.AnalysisException: nondeterministic
> >>> expressions
> >>> >> are
> >>> >> > only allowed in
> >>> >> > Project, Filter, Aggregate or Window, found:*
> >>> >> > (((CASE WHEN (a.`nav_tcdt` IS NULL) THEN CAST(((rand(9) *
> CAST(1000
> >>> AS
> >>> >> > DOUBLE)) - CAST(99L AS DOUBLE)) AS STRING) ELSE
> >>> a.`nav_tcdt`
> >>

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-16 Thread Chang Chen
It is tedious since we have lots of Hive SQL being migrated to Spark.  And
this workaround is equivalent  to insert a Project between Join operator
and its child.

Why not do it in PullOutNondeterministic?

Thanks
Chang

On Fri, Jul 14, 2017 at 5:29 PM, Liang-Chi Hsieh <vii...@gmail.com> wrote:

>
> A possible workaround is to add the rand column into tbl1 with a projection
> before the join.
>
> SELECT a.col1
> FROM (
>   SELECT col1,
> CASE
>  WHEN col2 IS NULL
>THEN cast(rand(9)*1000 - 99 as string)
>  ELSE
>col2
> END AS col2
> FROM tbl1) a
> LEFT OUTER JOIN tbl2 b
> ON a.col2 = b.col3;
>
>
>
> Chang Chen wrote
> > Hi Wenchen
> >
> > Yes. We also find this error is caused by Rand. However, this is classic
> > way to solve data skew in Hive.  Is there any equivalent way in Spark?
> >
> > Thanks
> > Chang
> >
> > On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan 
>
> > cloud0fan@
>
> >  wrote:
> >
> >> It’s not about case when, but about rand(). Non-deterministic
> expressions
> >> are not allowed in join condition.
> >>
> >> > On 13 Jul 2017, at 6:43 PM, wangshuang 
>
> > cn_wss@
>
> >  wrote:
> >> >
> >> > I'm trying to execute hive sql on spark sql (Also on spark
> >> thriftserver), For
> >> > optimizing data skew, we use "case when" to handle null.
> >> > Simple sql as following:
> >> >
> >> >
> >> > SELECT a.col1
> >> > FROM tbl1 a
> >> > LEFT OUTER JOIN tbl2 b
> >> > ON
> >> > * CASE
> >> >   WHEN a.col2 IS NULL
> >> >   TNEN cast(rand(9)*1000 - 99 as string)
> >> >   ELSE
> >> >   a.col2 END *
> >> >   = b.col3;
> >> >
> >> >
> >> > But I get the error:
> >> >
> >> > == Physical Plan ==
> >> > *org.apache.spark.sql.AnalysisException: nondeterministic expressions
> >> are
> >> > only allowed in
> >> > Project, Filter, Aggregate or Window, found:*
> >> > (((CASE WHEN (a.`nav_tcdt` IS NULL) THEN CAST(((rand(9) * CAST(1000 AS
> >> > DOUBLE)) - CAST(99L AS DOUBLE)) AS STRING) ELSE a.`nav_tcdt`
> >> END
> >> =
> >> > c.`site_categ_id`) AND (CAST(a.`nav_tcd` AS INT) = 9)) AND
> >> (c.`cur_flag`
> >> =
> >> > 1))
> >> > in operator Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
> >> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double))
> as
> >> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26
> as
> >> int)
> >> > = 9)) && (cur_flag#77 = 1))
> >> >   ;;
> >> > GlobalLimit 10
> >> > +- LocalLimit 10
> >> >   +- Aggregate [date_id#7, CASE WHEN (cast(city_id#10 as string) IN
> >> > (cast(19596 as string),cast(20134 as string),cast(10997 as string)) &&
> >> > nav_tcdt#25 RLIKE ^[0-9]+$) THEN city_id#10 ELSE nav_tpa_id#21 END],
> >> > [date_id#7]
> >> >  +- Filter (date_id#7 = 2017-07-12)
> >> > +- Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
> >> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double))
> as
> >> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26
> as
> >> int)
> >> > = 9)) && (cur_flag#77 = 1))
> >> >:- SubqueryAlias a
> >> >:  +- SubqueryAlias tmp_lifan_trfc_tpa_hive
> >> >: +- CatalogRelation `tmp`.`tmp_lifan_trfc_tpa_hive`,
> >> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [date_id#7,
> >> chanl_id#8L,
> >> > pltfm_id#9, city_id#10, sessn_id#11, gu_id#12,
> >> nav_refer_page_type_id#13,
> >> > nav_refer_page_value#14, nav_refer_tpa#15, nav_refer_tpa_id#16,
> >> > nav_refer_tpc#17, nav_refer_tpi#18, nav_page_type_id#19,
> >> nav_page_value#20,
> >> > nav_tpa_id#21, nav_tpa#22, nav_tpc#23, nav_tpi#24, nav_tcdt#25,
> >> nav_tcd#26,
> >> > nav_tci#27, nav_tce#28, detl_refer_page_type_id#29,
> >> > detl_refer_page_value#30, ... 33 more fields]
> >> >+- SubqueryAlias c
>

Re: [SQL] Syntax "case when" doesn't be supported in JOIN

2017-07-13 Thread Chang Chen
Hi Wenchen

Yes. We also find this error is caused by Rand. However, this is classic
way to solve data skew in Hive.  Is there any equivalent way in Spark?

Thanks
Chang

On Thu, Jul 13, 2017 at 8:25 PM, Wenchen Fan  wrote:

> It’s not about case when, but about rand(). Non-deterministic expressions
> are not allowed in join condition.
>
> > On 13 Jul 2017, at 6:43 PM, wangshuang  wrote:
> >
> > I'm trying to execute hive sql on spark sql (Also on spark
> thriftserver), For
> > optimizing data skew, we use "case when" to handle null.
> > Simple sql as following:
> >
> >
> > SELECT a.col1
> > FROM tbl1 a
> > LEFT OUTER JOIN tbl2 b
> > ON
> > * CASE
> >   WHEN a.col2 IS NULL
> >   TNEN cast(rand(9)*1000 - 99 as string)
> >   ELSE
> >   a.col2 END *
> >   = b.col3;
> >
> >
> > But I get the error:
> >
> > == Physical Plan ==
> > *org.apache.spark.sql.AnalysisException: nondeterministic expressions
> are
> > only allowed in
> > Project, Filter, Aggregate or Window, found:*
> > (((CASE WHEN (a.`nav_tcdt` IS NULL) THEN CAST(((rand(9) * CAST(1000 AS
> > DOUBLE)) - CAST(99L AS DOUBLE)) AS STRING) ELSE a.`nav_tcdt` END
> =
> > c.`site_categ_id`) AND (CAST(a.`nav_tcd` AS INT) = 9)) AND (c.`cur_flag`
> =
> > 1))
> > in operator Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double)) as
> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26 as
> int)
> > = 9)) && (cur_flag#77 = 1))
> >   ;;
> > GlobalLimit 10
> > +- LocalLimit 10
> >   +- Aggregate [date_id#7, CASE WHEN (cast(city_id#10 as string) IN
> > (cast(19596 as string),cast(20134 as string),cast(10997 as string)) &&
> > nav_tcdt#25 RLIKE ^[0-9]+$) THEN city_id#10 ELSE nav_tpa_id#21 END],
> > [date_id#7]
> >  +- Filter (date_id#7 = 2017-07-12)
> > +- Join LeftOuter, (((CASE WHEN isnull(nav_tcdt#25) THEN
> > cast(((rand(9) * cast(1000 as double)) - cast(99 as double)) as
> > string) ELSE nav_tcdt#25 END = site_categ_id#80) && (cast(nav_tcd#26 as
> int)
> > = 9)) && (cur_flag#77 = 1))
> >:- SubqueryAlias a
> >:  +- SubqueryAlias tmp_lifan_trfc_tpa_hive
> >: +- CatalogRelation `tmp`.`tmp_lifan_trfc_tpa_hive`,
> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [date_id#7,
> chanl_id#8L,
> > pltfm_id#9, city_id#10, sessn_id#11, gu_id#12, nav_refer_page_type_id#13,
> > nav_refer_page_value#14, nav_refer_tpa#15, nav_refer_tpa_id#16,
> > nav_refer_tpc#17, nav_refer_tpi#18, nav_page_type_id#19,
> nav_page_value#20,
> > nav_tpa_id#21, nav_tpa#22, nav_tpc#23, nav_tpi#24, nav_tcdt#25,
> nav_tcd#26,
> > nav_tci#27, nav_tce#28, detl_refer_page_type_id#29,
> > detl_refer_page_value#30, ... 33 more fields]
> >+- SubqueryAlias c
> >   +- SubqueryAlias dim_site_categ_ext
> >  +- CatalogRelation `dw`.`dim_site_categ_ext`,
> > org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,
> [site_categ_skid#64L,
> > site_categ_type#65, site_categ_code#66, site_categ_name#67,
> > site_categ_parnt_skid#68L, site_categ_kywrd#69, leaf_flg#70L,
> sort_seq#71L,
> > site_categ_srch_name#72, vsbl_flg#73, delet_flag#74, etl_batch_id#75L,
> > updt_time#76, cur_flag#77, bkgrnd_categ_skid#78L, bkgrnd_categ_id#79L,
> > site_categ_id#80, site_categ_parnt_id#81]
> >
> > Does spark sql not support syntax "case when" in JOIN?  Additional, my
> spark
> > version is 2.2.0.
> > Any help would be greatly appreciated.
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-developers
> -list.1001551.n3.nabble.com/SQL-Syntax-case-when-doesn-t-
> be-supported-in-JOIN-tp21953.html
> > Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
> >
> > -
> > To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
> >
>
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


Is static volatile variable different with static variable in the closure?

2017-06-07 Thread Chang Chen
Static variable  will be initialized in worker node JVM, will not be
serialized from master. But how about static volatile variable?

Recently I read the beam spark runner code, and i find that they use static
volatile Broadcast variable. See
https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java#L46

It confused me a lot, is static volatile variable different with static
variable in the closure?


Re: Lineage between Datasets

2017-04-12 Thread Chang Chen
Does it mean any two Datasets's physical plans are independent?

Thanks
Chang

On Thu, Apr 13, 2017 at 12:53 AM, Reynold Xin <r...@databricks.com> wrote:

> The physical plans are not subtrees, but the analyzed plan (before the
> optimizer runs) is actually similar to "lineage". You can get that by
> calling explain(true) and look at the analyzed plan.
>
>
> On Wed, Apr 12, 2017 at 3:03 AM Chang Chen <baibaic...@gmail.com> wrote:
>
>> Hi All
>>
>> I believe that there is no lineage between datasets. Consider this case:
>>
>> val people = spark.read.parquet("...").as[Person]
>>
>> val ageGreatThan30 = people.filter("age > 30")
>>
>> Since the second DS can push down the condition, they are obviously
>> different logical plans and hence are different physical plan.
>>
>> What I understanding is right?
>>
>> Thanks
>> Chang
>>
>


Lineage between Datasets

2017-04-12 Thread Chang Chen
Hi All

I believe that there is no lineage between datasets. Consider this case:

val people = spark.read.parquet("...").as[Person]

val ageGreatThan30 = people.filter("age > 30")

Since the second DS can push down the condition, they are obviously
different logical plans and hence are different physical plan.

What I understanding is right?

Thanks
Chang