Re: DSv2 reader lifecycle

2019-11-06 Thread Andrew Melo
Hi Ryan,

Thanks for the pointers

On Thu, Nov 7, 2019 at 8:13 AM Ryan Blue  wrote:

> Hi Andrew,
>
> This is expected behavior for DSv2 in 2.4. A separate reader is configured
> for each operation because the configuration will change. A count, for
> example, doesn't need to project any columns, but a count distinct will.
> Similarly, if your read has different filters we need to apply those to a
> separate reader for each scan.
>

Ah, I presumed that the interaction was slightly different, there was a
single reader configured and (e.g.) pruneSchema was called repeatedly to
change the desired output schema. I guess for 2.4 it's best for me to
cache/memoize the metadata for paths/files to keep them from being
repeatedly calculated.


>
> The newer API that we are releasing in Spark 3.0 addresses the concern
> that each reader is independent by using Catalog and Table interfaces. In
> the new version, Spark will load a table by name from a persistent catalog
> (loaded once) and use the table to create a reader for each operation. That
> way, you can load common metadata in the table, cache the table, and pass
> its info to readers as they are created.
>

That's good to know, I'll search around JIRA for docs describing that
functionality.

Thanks again,
Andrew


>
> rb
>
> On Tue, Nov 5, 2019 at 4:58 PM Andrew Melo  wrote:
>
>> Hello,
>>
>> During testing of our DSv2 implementation (on 2.4.3 FWIW), it appears
>> that our DataSourceReader is being instantiated multiple times for the same
>> dataframe. For example, the following snippet
>>
>> Dataset df = spark
>> .read()
>> .format("edu.vanderbilt.accre.laurelin.Root")
>> .option("tree",  "Events")
>> .load("testdata/pristine/2018nanoaod1june2019.root");
>>
>> Constructs edu.vanderbilt.accre.laurelin.Root twice and then calls
>> createReader once (as an aside, this seems like a lot for 1000 columns?
>> "CodeGenerator: Code generated in 8162.847517 ms")
>>
>> but then running operations on that dataframe (e.g. df.count()) calls
>> createReader for each call, instead of holding the existing
>> DataSourceReader.
>>
>> Is that the expected behavior? Because of the file format, it's quite
>> expensive to deserialize all the various metadata, so I was holding the
>> deserialized version in the DataSourceReader, but if Spark is repeatedly
>> constructing new ones, then that doesn't help. If this is the expected
>> behavior, how should I handle this as a consumer of the API?
>>
>> Thanks!
>> Andrew
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>


Re: Build customized resource manager

2019-11-06 Thread Klaus Ma
Any suggestions?

- Klaus

On Mon, Nov 4, 2019 at 5:04 PM Klaus Ma  wrote:

> Hi team,
>
> AFAIK, we built k8s/yarn/mesos as resource manager; but I'd like to did
> some enhancement to them, e.g. integrate with Volcano
>  in k8s. Is that possible to do
> that without fork the whole spark project? For example, enable customized
> resource manager with configuration, e.g. replace
> `org.apache.spark.deploy.k8s.submit.KubernetesClientApplication` with
> `MyK8SClient`, so I can only maintain the resource manager instead of the
> whole project.
>
> -- Klaus
>


Re: DSv2 reader lifecycle

2019-11-06 Thread Ryan Blue
Hi Andrew,

This is expected behavior for DSv2 in 2.4. A separate reader is configured
for each operation because the configuration will change. A count, for
example, doesn't need to project any columns, but a count distinct will.
Similarly, if your read has different filters we need to apply those to a
separate reader for each scan.

The newer API that we are releasing in Spark 3.0 addresses the concern that
each reader is independent by using Catalog and Table interfaces. In the
new version, Spark will load a table by name from a persistent catalog
(loaded once) and use the table to create a reader for each operation. That
way, you can load common metadata in the table, cache the table, and pass
its info to readers as they are created.

rb

On Tue, Nov 5, 2019 at 4:58 PM Andrew Melo  wrote:

> Hello,
>
> During testing of our DSv2 implementation (on 2.4.3 FWIW), it appears that
> our DataSourceReader is being instantiated multiple times for the same
> dataframe. For example, the following snippet
>
> Dataset df = spark
> .read()
> .format("edu.vanderbilt.accre.laurelin.Root")
> .option("tree",  "Events")
> .load("testdata/pristine/2018nanoaod1june2019.root");
>
> Constructs edu.vanderbilt.accre.laurelin.Root twice and then calls
> createReader once (as an aside, this seems like a lot for 1000 columns?
> "CodeGenerator: Code generated in 8162.847517 ms")
>
> but then running operations on that dataframe (e.g. df.count()) calls
> createReader for each call, instead of holding the existing
> DataSourceReader.
>
> Is that the expected behavior? Because of the file format, it's quite
> expensive to deserialize all the various metadata, so I was holding the
> deserialized version in the DataSourceReader, but if Spark is repeatedly
> constructing new ones, then that doesn't help. If this is the expected
> behavior, how should I handle this as a consumer of the API?
>
> Thanks!
> Andrew
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: [SPARK-29176][DISCUSS] Optimization should change join type to CROSS

2019-11-06 Thread Enrico Minack
So you say the optimized inner join with no conditions is also a valid 
query?


Then I agree the optimizer is not breaking the query, hence it is not a bug.

Enrico

Am 06.11.19 um 15:53 schrieb Sean Owen:

You asked for an inner join but it turned into a cross-join. This
might be surprising, hence the error you can disable.
The query is not invalid in any case. It's just stopping you from
doing something you may not meant to, and which may be expensive.
However I think we've already changed the default to enable it in
Spark 3 anyway.

On Wed, Nov 6, 2019 at 8:50 AM Enrico Minack  wrote:

Hi,

I would like to discuss issue SPARK-29176 to see if this is considered a bug 
and if so, to sketch out a fix.

In short, the issue is that a valid inner join with condition gets optimized so 
that no condition is left, but the type is still INNER. Then 
CheckCartesianProducts throws an exception. The type should have changed to 
CROSS when it gets optimized in that way.

I understand that with spark.sql.crossJoin.enabled you can make Spark not throw 
this exception, but I think you should not need this work-around for a valid 
query.

Please let me know what you think about this issue and how I could fix it. It 
might affect more rules than the two given in the Jira ticket.

Thanks,
Enrico

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org




-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: [SPARK-29176][DISCUSS] Optimization should change join type to CROSS

2019-11-06 Thread Sean Owen
You asked for an inner join but it turned into a cross-join. This
might be surprising, hence the error you can disable.
The query is not invalid in any case. It's just stopping you from
doing something you may not meant to, and which may be expensive.
However I think we've already changed the default to enable it in
Spark 3 anyway.

On Wed, Nov 6, 2019 at 8:50 AM Enrico Minack  wrote:
>
> Hi,
>
> I would like to discuss issue SPARK-29176 to see if this is considered a bug 
> and if so, to sketch out a fix.
>
> In short, the issue is that a valid inner join with condition gets optimized 
> so that no condition is left, but the type is still INNER. Then 
> CheckCartesianProducts throws an exception. The type should have changed to 
> CROSS when it gets optimized in that way.
>
> I understand that with spark.sql.crossJoin.enabled you can make Spark not 
> throw this exception, but I think you should not need this work-around for a 
> valid query.
>
> Please let me know what you think about this issue and how I could fix it. It 
> might affect more rules than the two given in the Jira ticket.
>
> Thanks,
> Enrico

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



[SPARK-29176][DISCUSS] Optimization should change join type to CROSS

2019-11-06 Thread Enrico Minack

Hi,

I would like to discuss issue SPARK-29176 to see if this is considered a 
bug and if so, to sketch out a fix.


In short, the issue is that a valid inner join with condition gets 
optimized so that no condition is left, but the type is still INNER. 
Then CheckCartesianProducts throws an exception. The type should have 
changed to CROSS when it gets optimized in that way.


I understand that with spark.sql.crossJoin.enabled you can make Spark 
not throw this exception, but I think you should not need this 
work-around for a valid query.


Please let me know what you think about this issue and how I could fix 
it. It might affect more rules than the two given in the Jira ticket.


Thanks,
Enrico


Re: [DISCUSS] Remove sorting of fields in PySpark SQL Row construction

2019-11-06 Thread Wenchen Fan
Sounds reasonable to me. We should make the behavior consistent within
Spark.

On Tue, Nov 5, 2019 at 6:29 AM Bryan Cutler  wrote:

> Currently, when a PySpark Row is created with keyword arguments, the
> fields are sorted alphabetically. This has created a lot of confusion with
> users because it is not obvious (although it is stated in the pydocs) that
> they will be sorted alphabetically. Then later when applying a schema and
> the field order does not match, an error will occur. Here is a list of some
> of the JIRAs that I have been tracking all related to this issue:
> SPARK-24915, SPARK-22232, SPARK-27939, SPARK-27712, and relevant discussion
> of the issue [1].
>
> The original reason for sorting fields is because kwargs in python < 3.6
> are not guaranteed to be in the same order that they were entered [2].
> Sorting alphabetically ensures a consistent order. Matters are further
> complicated with the flag _*from_dict*_ that allows the Row fields to to
> be referenced by name when made by kwargs, but this flag is not serialized
> with the Row and leads to inconsistent behavior. For instance:
>
> >>> spark.createDataFrame([Row(A="1", B="2")], "B string, A string").first()
> Row(B='2', A='1')>>> 
> spark.createDataFrame(spark.sparkContext.parallelize([Row(A="1", B="2")]), "B 
> string, A string").first()
> Row(B='1', A='2')
>
> I think the best way to fix this is to remove the sorting of fields when
> constructing a Row. For users with Python 3.6+, nothing would change
> because these versions of Python ensure that the kwargs stays in the
> ordered entered. For users with Python < 3.6, using kwargs would check a
> conf to either raise an error or fallback to a LegacyRow that sorts the
> fields as before. With Python < 3.6 being deprecated now, this LegacyRow
> can also be removed at the same time. There are also other ways to create
> Rows that will not be affected. I have opened a JIRA [3] to capture this,
> but I am wondering what others think about fixing this for Spark 3.0?
>
> [1] https://github.com/apache/spark/pull/20280
> [2] https://www.python.org/dev/peps/pep-0468/
> [3] https://issues.apache.org/jira/browse/SPARK-29748
>
>