Re: Feature request: split dataset based on condition

2019-02-04 Thread Thakrar, Jayesh
Just wondering if this is what you are implying Ryan (example only):

val data = (dataset to be partitionned)

val splitCondition =
s"""
CASE
   WHEN …. THEN ….
   WHEN …. THEN …..
  END partition_condition
"""
val partitionedData = data.withColumn("partitionColumn", expr(splitCondition))

In this case there might be a need to cache/persist the partitionedData dataset 
to avoid recomputation as each "partition" is processed (e.g. saved, etc.) 
later on, correct?

From: Ryan Blue 
Reply-To: 
Date: Monday, February 4, 2019 at 12:16 PM
To: Andrew Melo 
Cc: Moein Hosseini , dev 
Subject: Re: Feature request: split dataset based on condition

To partition by a condition, you would need to create a column with the result 
of that condition. Then you would partition by that column. The sort option 
would also work here.

I don't think that there is much of a use case for this. You have a set of 
conditions on which to partition your data, and partitioning is already 
supported. The idea to use conditions to create separate data frames would 
actually make that harder because you'd need to create and name tables for each 
one.

On Mon, Feb 4, 2019 at 9:16 AM Andrew Melo 
mailto:andrew.m...@gmail.com>> wrote:
Hello Ryan,

On Mon, Feb 4, 2019 at 10:52 AM Ryan Blue 
mailto:rb...@netflix.com>> wrote:
>
> Andrew, can you give us more information about why partitioning the output 
> data doesn't work for your use case?
>
> It sounds like all you need to do is to create a table partitioned by A and 
> B, then you would automatically get the divisions you want. If what you're 
> looking for is a way to scale the number of combinations then you can use 
> formats that support more partitions, or you could sort by the fields and 
> rely on Parquet row group pruning to filter out data you don't want.
>

TBH, I don't understand what that would look like in pyspark and what
the consequences would be. Looking at the docs, it doesn't appear to
be the syntax for partitioning on a condition (most of our conditions
are of the form 'X > 30'). The use of Spark is still somewhat new in
our field, so it's possible we're not using it correctly.

Cheers
Andrew

> rb
>
> On Mon, Feb 4, 2019 at 8:33 AM Andrew Melo 
> mailto:andrew.m...@gmail.com>> wrote:
>>
>> Hello
>>
>> On Sat, Feb 2, 2019 at 12:19 AM Moein Hosseini 
>> mailto:moein...@gmail.com>> wrote:
>> >
>> > I've seen many application need to split dataset to multiple datasets 
>> > based on some conditions. As there is no method to do it in one place, 
>> > developers use filter method multiple times. I think it can be useful to 
>> > have method to split dataset based on condition in one iteration, 
>> > something like partition method of scala (of-course scala partition just 
>> > split list into two list, but something more general can be more useful).
>> > If you think it can be helpful, I can create Jira issue and work on it to 
>> > send PR.
>>
>> This would be a really useful feature for our use case (processing
>> collision data from the LHC). We typically want to take some sort of
>> input and split into multiple disjoint outputs based on some
>> conditions. E.g. if we have two conditions A and B, we'll end up with
>> 4 outputs (AB, !AB, A!B, !A!B). As we add more conditions, the
>> combinatorics explode like n^2, when we could produce them all up
>> front with this "multi filter" (or however it would be called).
>>
>> Cheers
>> Andrew
>>
>> >
>> > Best Regards
>> > Moein
>> >
>> > --
>> >
>> > Moein Hosseini
>> > Data Engineer
>> > mobile: +98 912 468 1859
>> > site: www.moein.xyz
>> > email: moein...@gmail.com
>> >
>>
>> -
>> To unsubscribe e-mail: 
>> dev-unsubscr...@spark.apache.org
>>
>
>
> --
> Ryan Blue
> Software Engineer
> Netflix


--
Ryan Blue
Software Engineer
Netflix


Re: DataSourceV2 community sync #3

2018-12-03 Thread Thakrar, Jayesh
Thank you Ryan and Xiao – sharing all this info really gives a very good 
insight!

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Monday, December 3, 2018 at 12:05 PM
To: "Thakrar, Jayesh" 
Cc: Xiao Li , Spark Dev List 
Subject: Re: DataSourceV2 community sync #3


Jayesh,

I don’t think this need is very narrow.

To have reliable behavior for CTAS, you need to:
1.   Check whether a table exists and fail. Right now, it is up to the 
source whether to continue with the write if the table already exists or to 
throw an exception, which is unreliable across sources.
2.   Create a table if it doesn’t exist.
3.   Drop the table if writing failed. In the current implementation, this 
can’t be done reliably because #1 is unreliable. So a failed CTAS has a 
side-effect that the table is created in some cases and a subsequent retry can 
fail because the table exists.

Leaving these operations up to the read/write API is why behavior isn’t 
consistent today. It also increases the amount of work that a source needs to 
do and mixes concerns (what to do in a write when the table doesn’t exist). 
Spark is going to be a lot more predictable if we decompose the behavior of 
these operations into create, drop, write, etc.

And in addition to CTAS, we want these operations to be exposed for sources. If 
Spark can create a table, why wouldn’t you be able to run DROP TABLE to remove 
it?

Last, Spark must be able to interact with the source of truth for tables. If 
Spark can’t create a table in Cassandra, it should reject a CTAS operation.

On Mon, Dec 3, 2018 at 9:52 AM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Thank you Xiao – I was wondering what was the motivation for the catalog.
If CTAS is the only candidate, would it suffice to have that as part of the 
data source interface only?

If we look at BI, ETL and reporting tools which interface with many tables from 
different data sources at the same time, it makes sense to have a metadata 
catalog as the catalog is used to “design” the work for that tool (e.g. ETL 
processing unit, etc). Furthermore, the catalog serves as a data mapping to map 
external data types to the tool’s data types.

Is the vision to move in that direction for Spark with the catalog 
support/feature?
Also, is the vision to also incorporate the “options” specified for the data 
source into the catalog too?
That may be helpful in some situations (e.g. the JDBC connect string being 
available from the catalog).
From: Xiao Li mailto:gatorsm...@gmail.com>>
Date: Monday, December 3, 2018 at 10:44 AM
To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: Ryan Blue mailto:rb...@netflix.com>>, 
"u...@spark.apache.org<mailto:u...@spark.apache.org>" 
mailto:dev@spark.apache.org>>
Subject: Re: DataSourceV2 community sync #3

Hi, Jayesh,

This is a good question. Spark is a unified analytics engine for various data 
sources. We are able to get the table schema from the underlying data sources 
via our data source APIs. Thus, it resolves most of the user requirements. 
Spark does not need the other info (like database, function, and views) that 
are stored in the local catalog. Note, Spark is not a query engine for a 
specific data source. Thus, we did not accept any public API that does not have 
an implementation in the past. I believe this still holds.

The catalog is part of the Spark SQL in the initial design and implementation. 
For the data sources that do not have catalog, they can use our catalog as a 
single source of truth. If they already have their own catalog, normally, they 
use the underlying data sources as the single source of truth. The table 
metadata in the Spark catalog is kind of a view of their physical schema that 
are stored in their local catalog. To support an atomic CREATE TABLE AS SELECT 
that requires modifying the catalog and data, we can add an interface for data 
sources but that is not part of catalog interface. The CTAS will not bypass our 
catalog. We will still register it in our catalog and the schema may or may not 
be stored in our catalog.

Will we define a super-feature catalog that can support all the data sources?

Based on my understanding, it is very hard. The priority is low based on our 
current scope of Spark SQL. If you want to do it, your design needs to consider 
how it works between global and local catalogs. This also requires a SPIP and 
voting. If you want to develop it incrementally without a design, I would 
suggest you to do it in your own fork. In the past, Spark on K8S was developed 
in a separate fork and then merged to the upstream of Apache Spark.

Welcome your contributions and let us make Spark great!

Cheers,

Xiao

Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> 
于2018年12月1日周六 下午9:10写道:
Just curious on the need for a catalog within Spark.

So Spark interface with other systems – many of which have a cata

Re: DataSourceV2 community sync #3

2018-12-03 Thread Thakrar, Jayesh
Thank you Xiao – I was wondering what was the motivation for the catalog.
If CTAS is the only candidate, would it suffice to have that as part of the 
data source interface only?

If we look at BI, ETL and reporting tools which interface with many tables from 
different data sources at the same time, it makes sense to have a metadata 
catalog as the catalog is used to “design” the work for that tool (e.g. ETL 
processing unit, etc). Furthermore, the catalog serves as a data mapping to map 
external data types to the tool’s data types.

Is the vision to move in that direction for Spark with the catalog 
support/feature?
Also, is the vision to also incorporate the “options” specified for the data 
source into the catalog too?
That may be helpful in some situations (e.g. the JDBC connect string being 
available from the catalog).
From: Xiao Li 
Date: Monday, December 3, 2018 at 10:44 AM
To: "Thakrar, Jayesh" 
Cc: Ryan Blue , "u...@spark.apache.org" 

Subject: Re: DataSourceV2 community sync #3

Hi, Jayesh,

This is a good question. Spark is a unified analytics engine for various data 
sources. We are able to get the table schema from the underlying data sources 
via our data source APIs. Thus, it resolves most of the user requirements. 
Spark does not need the other info (like database, function, and views) that 
are stored in the local catalog. Note, Spark is not a query engine for a 
specific data source. Thus, we did not accept any public API that does not have 
an implementation in the past. I believe this still holds.

The catalog is part of the Spark SQL in the initial design and implementation. 
For the data sources that do not have catalog, they can use our catalog as a 
single source of truth. If they already have their own catalog, normally, they 
use the underlying data sources as the single source of truth. The table 
metadata in the Spark catalog is kind of a view of their physical schema that 
are stored in their local catalog. To support an atomic CREATE TABLE AS SELECT 
that requires modifying the catalog and data, we can add an interface for data 
sources but that is not part of catalog interface. The CTAS will not bypass our 
catalog. We will still register it in our catalog and the schema may or may not 
be stored in our catalog.

Will we define a super-feature catalog that can support all the data sources?

Based on my understanding, it is very hard. The priority is low based on our 
current scope of Spark SQL. If you want to do it, your design needs to consider 
how it works between global and local catalogs. This also requires a SPIP and 
voting. If you want to develop it incrementally without a design, I would 
suggest you to do it in your own fork. In the past, Spark on K8S was developed 
in a separate fork and then merged to the upstream of Apache Spark.

Welcome your contributions and let us make Spark great!

Cheers,

Xiao

Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> 
于2018年12月1日周六 下午9:10写道:
Just curious on the need for a catalog within Spark.

So Spark interface with other systems – many of which have a catalog of their 
own – e.g. RDBMSes, HBase, Cassandra, etc. and some don’t (e.g. HDFS, 
filesyststem, etc).
So what is the purpose of having this catalog within Spark for tables defined 
in Spark (which could be a front for other “catalogs”)?
Is it trying to fulfill some void/need…..
Also, would the Spark catalog be the common denominator of the other catalogs 
(least featured) or a super-feature catalog?

From: Xiao Li mailto:gatorsm...@gmail.com>>
Date: Saturday, December 1, 2018 at 10:49 PM
To: Ryan Blue mailto:rb...@netflix.com>>
Cc: "u...@spark.apache.org<mailto:u...@spark.apache.org>" 
mailto:dev@spark.apache.org>>
Subject: Re: DataSourceV2 community sync #3

Hi, Ryan,

Let us first focus on answering the most fundamental problem before discussing 
various related topics. What is a catalog in Spark SQL?

My definition of catalog is based on the database catalog. Basically, the 
catalog provides a service that manage the metadata/definitions of database 
objects (e.g., database, views, tables, functions, user roles, and so on).

In Spark SQL, all the external objects accessed through our data source APIs 
are called "tables". I do not think we will expand the support in the near 
future. That means, the metadata we need from the external data sources are for 
table only.

These data sources should not use the Catalog identifier to identify. That 
means, in "catalog.database.table", catalog is only used to identify the actual 
catalog instead of data sources.

For a Spark cluster, we could mount multiple catalogs (e.g., hive_metastore_1, 
hive_metastore_2 and glue_1) at the same time. We could get the metadata of the 
tables, database, functions by accessing different catalog: 
"hive_metastore_1.db1.tab1", "hive_metastore_2.db2.tab2", "glue.db3.tab2". In 
th

Re: DataSourceV2 community sync #3

2018-12-01 Thread Thakrar, Jayesh
Just curious on the need for a catalog within Spark.

So Spark interface with other systems – many of which have a catalog of their 
own – e.g. RDBMSes, HBase, Cassandra, etc. and some don’t (e.g. HDFS, 
filesyststem, etc).
So what is the purpose of having this catalog within Spark for tables defined 
in Spark (which could be a front for other “catalogs”)?
Is it trying to fulfill some void/need…..
Also, would the Spark catalog be the common denominator of the other catalogs 
(least featured) or a super-feature catalog?

From: Xiao Li 
Date: Saturday, December 1, 2018 at 10:49 PM
To: Ryan Blue 
Cc: "u...@spark.apache.org" 
Subject: Re: DataSourceV2 community sync #3

Hi, Ryan,

Let us first focus on answering the most fundamental problem before discussing 
various related topics. What is a catalog in Spark SQL?

My definition of catalog is based on the database catalog. Basically, the 
catalog provides a service that manage the metadata/definitions of database 
objects (e.g., database, views, tables, functions, user roles, and so on).

In Spark SQL, all the external objects accessed through our data source APIs 
are called "tables". I do not think we will expand the support in the near 
future. That means, the metadata we need from the external data sources are for 
table only.

These data sources should not use the Catalog identifier to identify. That 
means, in "catalog.database.table", catalog is only used to identify the actual 
catalog instead of data sources.

For a Spark cluster, we could mount multiple catalogs (e.g., hive_metastore_1, 
hive_metastore_2 and glue_1) at the same time. We could get the metadata of the 
tables, database, functions by accessing different catalog: 
"hive_metastore_1.db1.tab1", "hive_metastore_2.db2.tab2", "glue.db3.tab2". In 
the future, if Spark has its own catalog implementation, we might have 
something like, "spark_catalog1.db3.tab2". The catalog will be used for 
registering all the external data sources, various Spark UDFs and so on.

At the same time, we should NOT mix the table-level data sources with catalog 
support. That means, "Cassandra1.db1.tab1", "Kafka2.db2.tab1", 
"Hbase3.db1.tab2" will not appear.

Do you agree on my definition of catalog in Spark SQL?

Xiao


Ryan Blue mailto:rb...@netflix.com>> 于2018年12月1日周六 下午7:25写道:

I try to avoid discussing each specific topic about the catalog federation 
before we deciding the framework of multi-catalog supports.

I’ve tried to open discussions on this for the last 6+ months because we need 
it. I understand that you’d like a comprehensive plan for supporting more than 
one catalog before moving forward, but I think most of us are okay with the 
incremental approach. It’s better to make progress toward the goal.

In general, data source API V2 and catalog API should be orthogonal
I agree with you, and they are. The API that Wenchen is working on for reading 
and writing data and the TableCatalog API are orthogonal efforts. As I said, 
they only overlap with the Table interface, and clearly tables loaded from a 
catalog need to be able to plug into the read/write API.

The reason these two efforts are related is that the community voted to 
standardize logical 
plans.
 Those standard plans have well-defined behavior for operations like CTAS, 
instead of relying on the data source plugin to do … something undefined. To 
implement this, we need a way for Spark to create tables, drop tables, etc. 
That’s why we need a way for sources to plug in Table-related catalog 
operations. (Sorry if this was already clear. I know I talked about it at 
length in the first v2 sync up.)

While the two APIs are orthogonal and serve different purposes, implementing 
common operations requires that we have both.

I would not call it a table catalog. I do not expect the data source 
should/need to implement a catalog. Since you might want an atomic CTAS, we can 
improve the table metadata resolution logic to support it with different 
resolution priorities. For example, try to get the metadata from the external 
data source, if the table metadata is not available in the catalog.

It sounds like your definition of a “catalog” is different. I think you’re 
referring to a global catalog? Could you explain what you’re talking about here?

I’m talking about an API to interface with an external data source, which I 
think we need for the reasons I outlined above. I don’t care what we call it, 
but your comment seems to hint that there would be an API to look up tables in 
external sources. That’s the thing I’m talking about.

CatalogTableIdentifier: The PR is doing nothing but adding an interface.

Yes. I opened this PR to discuss how Spark should track tables from different 
catalogs and avoid passing those references to code paths that don’t support 
them. The use of table identifiers with a catalog part was discussed 

Re: Double pass over ORC data files even after supplying schema and setting inferSchema = false

2018-11-21 Thread Thakrar, Jayesh
Thank you for the quick reply Dongjoon.
This sound interesting and it might the resolution for our issue.

Let me see do some tests and will update the thread.

Thanks,
Jayesh

From: Dongjoon Hyun 
Date: Wednesday, November 21, 2018 at 11:46 AM
To: "Thakrar, Jayesh" 
Cc: dev 
Subject: Re: Double pass over ORC data files even after supplying schema and 
setting inferSchema = false

Hi, Thakrar.

Which version are you using now? If it's below Spark 2.4.0, please try to use 
2.4.0.

There was an improvement related to that.

https://issues.apache.org/jira/browse/SPARK-25126

Bests,
Dongjoon.


On Wed, Nov 21, 2018 at 6:17 AM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Hi All,

We have some batch processing where we read 100s of thousands of ORC files.
What I found is that this was taking too much time AND that there was a long 
pause between the point the read begins in the code and the executors get into 
action.
That period is about 1.5+ hours where only the driver seems to be busy.

I have a feeling that this is due to double pass over the data for schema 
inference AND validation (e.g. if one of the files has a missing field, there 
is an exception).
I tried providing the schema upfront as well as setting inferSchema to false, 
yet the same thing happens.

Is there any explanation for this and is there any way to avoid it?

Thanks,
Jayesh



Double pass over ORC data files even after supplying schema and setting inferSchema = false

2018-11-21 Thread Thakrar, Jayesh
Hi All,

We have some batch processing where we read 100s of thousands of ORC files.
What I found is that this was taking too much time AND that there was a long 
pause between the point the read begins in the code and the executors get into 
action.
That period is about 1.5+ hours where only the driver seems to be busy.

I have a feeling that this is due to double pass over the data for schema 
inference AND validation (e.g. if one of the files has a missing field, there 
is an exception).
I tried providing the schema upfront as well as setting inferSchema to false, 
yet the same thing happens.

Is there any explanation for this and is there any way to avoid it?

Thanks,
Jayesh



Re: [Discuss] Datasource v2 support for manipulating partitions

2018-09-20 Thread Thakrar, Jayesh
Here’s what can be done in PostgreSQL

You can create a partitioned table with a partition clause, e.g.
CREATE TABLE measurement (.) PARTITION BY RANGE (logdate)

You can create a partitioned table by creating tables as partitions of a 
partitioned table, e.g.
CREATE TABLE measurement_y2006m02 PARTITION OF measurement FOR VALUES FROM 
('2006-02-01') TO ('2006-03-01')

Each “partition” is like a table and can be managed just like a table.

And ofcourse you can have nested partitioning.

As for partition management, you can attach/detach partitions by converting a 
regular table into a table partition and a table partition into a regular table 
using the ALTER TABLE statement

ALTER TABLE measurement ATTACH/DETACH PARTITION

There are similar options in Oracle.
In Oracle, converting a table into a partition and vice-versa is referred to as 
 “partition exchange”.
However unlike Postgres, table partitions are not treated as regular tables.


As for partition management relevance in Spark API, here are some thoughts:

Reading data from a table supporting predicate pushdown
W/o explicit partition specification, we would need to rely on partition 
pruning to select the appropriate partitions
However if we can provide a mechanism to specify the partition(s), that would 
be great – and it would need to be translated into appropriate SQL clauses 
under the covers

Writing data to a table supporting partitions
I think there is no current way to support the above Postgres/Oracle ways of 
creating partition tables or doing table exchanges intelligently.
So probably options or some appropriate interfaces would be required
And the above ALTER TABLE equivalent work can be done as part of the commit 
(provided an appropriate interface is supported).

Here are Dale’s comments earlier from the thread
“So if we are not hiding them from the user, we need to allow users to
manipulate them. Either by representing them generically in the API,
allowing pass-through commands to manipulate them, or by some other means.”

I think we need to mull over this and also look beyond RDBMSes – say, S3 for 
applicability.

In essence, I think partitions matter because they allow partition pruning (= 
less resource intensive) during read and allow partition setup and 
appropriately targeting during write.


From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Wednesday, September 19, 2018 at 4:35 PM
To: "Thakrar, Jayesh" 
Cc: "tigerqu...@outlook.com" , Spark Dev List 

Subject: Re: [Discuss] Datasource v2 support for manipulating partitions

What does partition management look like in those systems and what are the 
options we would standardize in an API?

On Wed, Sep 19, 2018 at 2:16 PM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
I think partition management feature would be very useful in RDBMSes that 
support it – e.g. Oracle, PostgreSQL, and DB2.
In some cases add partitions can be explicit and can/may be done outside of 
data loads.
But in some other cases, it may/can need to be done implicitly when supported  
by the platform.
Similar to the static/dynamic partition loading in Hive and Oracle.

So in short, I agree that partition management should be an optional interface.

From: Ryan Blue mailto:rb...@netflix.com>>
Reply-To: "rb...@netflix.com<mailto:rb...@netflix.com>" 
mailto:rb...@netflix.com>>
Date: Wednesday, September 19, 2018 at 2:58 PM
To: "Thakrar, Jayesh" 
mailto:jthak...@conversantmedia.com>>
Cc: "tigerqu...@outlook.com<mailto:tigerqu...@outlook.com>" 
mailto:tigerqu...@outlook.com>>, Spark Dev List 
mailto:dev@spark.apache.org>>
Subject: Re: [Discuss] Datasource v2 support for manipulating partitions

I'm open to exploring the idea of adding partition management as a catalog API. 
The approach we're taking is to have an interface for each concern a catalog 
might implement, like TableCatalog (proposed in SPARK-24252), but also 
FunctionCatalog for stored functions and possibly PartitionedTableCatalog for 
explicitly partitioned tables.

That could definitely be used to implement ALTER TABLE ADD/DROP PARTITION for 
Hive tables, although I'm not sure that we would want to continue exposing 
partitions for simple tables. I know that this is important for storage systems 
like Kudu, but I think it is needlessly difficult and annoying for simple 
tables that are partitioned by a regular transformation like Hive tables. 
That's why Iceberg hides partitioning outside of table configuration. That also 
avoids problems where SELECT DISTINCT queries are wrong because a partition 
exists but has no data.

How useful is this outside of Kudu? Is it something that we should provide an 
API for, or is it specific enough to Kudu that Spark shouldn't include it in 
the API for all sources?

rb


On Tue, Sep 18, 2018 at 7:38 AM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:

Re: [Discuss] Datasource v2 support for manipulating partitions

2018-09-19 Thread Thakrar, Jayesh
I think partition management feature would be very useful in RDBMSes that 
support it – e.g. Oracle, PostgreSQL, and DB2.
In some cases add partitions can be explicit and can/may be done outside of 
data loads.
But in some other cases, it may/can need to be done implicitly when supported  
by the platform.
Similar to the static/dynamic partition loading in Hive and Oracle.

So in short, I agree that partition management should be an optional interface.

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Wednesday, September 19, 2018 at 2:58 PM
To: "Thakrar, Jayesh" 
Cc: "tigerqu...@outlook.com" , Spark Dev List 

Subject: Re: [Discuss] Datasource v2 support for manipulating partitions

I'm open to exploring the idea of adding partition management as a catalog API. 
The approach we're taking is to have an interface for each concern a catalog 
might implement, like TableCatalog (proposed in SPARK-24252), but also 
FunctionCatalog for stored functions and possibly PartitionedTableCatalog for 
explicitly partitioned tables.

That could definitely be used to implement ALTER TABLE ADD/DROP PARTITION for 
Hive tables, although I'm not sure that we would want to continue exposing 
partitions for simple tables. I know that this is important for storage systems 
like Kudu, but I think it is needlessly difficult and annoying for simple 
tables that are partitioned by a regular transformation like Hive tables. 
That's why Iceberg hides partitioning outside of table configuration. That also 
avoids problems where SELECT DISTINCT queries are wrong because a partition 
exists but has no data.

How useful is this outside of Kudu? Is it something that we should provide an 
API for, or is it specific enough to Kudu that Spark shouldn't include it in 
the API for all sources?

rb


On Tue, Sep 18, 2018 at 7:38 AM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Totally agree with you Dale, that there are situations for efficiency, 
performance and better control/visibility/manageability that we need to expose 
partition management.

So as described, I suggested two things - the ability to do it in the current 
V2 API form via options and appropriate implementation in datasource 
reader/writer.

And for long term, suggested that partition management can be made part of 
metadata/catalog management - SPARK-24252 (DataSourceV2: Add catalog support)?


On 9/17/18, 8:26 PM, "tigerquoll" 
mailto:tigerqu...@outlook.com>> wrote:

Hi Jayesh,
I get where you are coming from - partitions are just an implementation
optimisation that we really shouldn’t be bothering the end user with.
Unfortunately that view is like saying RPC is like a procedure call, and
details of the network transport should be hidden from the end user. CORBA
tried this approach for RPC and failed for the same reason that no major
vendor of DBMS systems that support partitions try to hide them from the end
user.  They have a substantial real world effect that is impossible to hide
from the user (in particular when writing/modifying the data source).  Any
attempt to “take care” of partitions automatically invariably guesses wrong
and ends up frustrating the end user (as “substantial real world effect”
turns to “show stopping performance penalty” if the user attempts to fight
against a partitioning scheme she has no idea exists)

So if we are not hiding them from the user, we need to allow users to
manipulate them. Either by representing them generically in the API,
allowing pass-through commands to manipulate them, or by some other means.

Regards,
Dale.




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/




--
Ryan Blue
Software Engineer
Netflix


Re: data source api v2 refactoring

2018-09-19 Thread Thakrar, Jayesh
Thanks for the info Ryan – very helpful!

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Wednesday, September 19, 2018 at 3:17 PM
To: "Thakrar, Jayesh" 
Cc: Wenchen Fan , Hyukjin Kwon , 
Spark Dev List 
Subject: Re: data source api v2 refactoring

Hi Jayesh,

The existing sources haven't been ported to v2 yet. That is going to be tricky 
because the existing sources implement behaviors that we need to keep for now.

I wrote up an SPIP to standardize logical plans while moving to the v2 sources. 
The reason why we need this is that too much is delegated to sources today. For 
example, sources are handed a SaveMode to overwrite data, but what exactly gets 
overwritten isn't defined and it varies by the source that gets used. That's 
not a good thing and we want to clean up what happens so that users know that a 
query behaves the same way across all v2 sources. CTAS shouldn't succeed for 
one source but fail for another if the table already exists.

Standardizing plans makes it difficult to port the existing sources to v2 
because we need to implement the behavior of the v2 plans, which may not be the 
existing v1 behavior. I think what we should do is keep the existing v1 sources 
working as they do today, and add a way to opt in for v2 behavior. One good way 
to do this is to use a new write API that is more clear; I proposed one in the 
SPIP I mentioned earlier. SQL is a bit easier because the behavior for SQL is 
fairly well-defined. The problem is mostly with the existing DF write API, 
DataFrameWriter.

It would be great to open a discussion about the compatibility between v1 and 
v2 and come up with a plan on this list.

rb

On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
Ryan et al,

Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) have 
been ported to V2.
I remember reading threads where there were discussions about the 
inefficiency/overhead of converting from Row to InternalRow that was preventing 
certain porting effort etc.

I ask because those are the most widely used data sources and have a lot of 
effort and thinking behind them, and if they have ported over to V2, then they 
can serve as excellent production examples of V2 API.

Thanks,
Jayesh

From: Ryan Blue 
Reply-To: mailto:rb...@netflix.com>>
Date: Friday, September 7, 2018 at 2:19 PM
To: Wenchen Fan mailto:cloud0...@gmail.com>>
Cc: Hyukjin Kwon mailto:gurwls...@gmail.com>>, Spark Dev 
List mailto:dev@spark.apache.org>>
Subject: Re: data source api v2 refactoring

There are a few v2-related changes that we can work in parallel, at least for 
reviews:

* SPARK-25006, #21978<https://github.com/apache/spark/pull/21978>: Add catalog 
to TableIdentifier - this proposes how to incrementally add multi-catalog 
support without breaking existing code paths
* SPARK-24253, #21308<https://github.com/apache/spark/pull/21308>: Add 
DeleteSupport API - this is a small API addition, which doesn't affect the 
refactor
* SPARK-24252, #21306<https://github.com/apache/spark/pull/21306>: Add v2 
Catalog API - this is a different way to create v2 tables, also doesn't affect 
the refactor

I agree that the PR for adding SQL support should probably wait on the 
refactor. I have also been meaning to share our implementation, which isn't 
based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and 
AlterTable from both SQL and the other methods in the DF API, saveAsTable and 
insertInto. It follows the structure that I proposed on the SQL support PR to 
convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS 
and RTAS.

rb


On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
Hi Ryan,

You are right that the `LogicalWrite` mirrors the read side API. I just don't 
have a good naming yet, and write side changes will be a different PR.


Hi Hyukjin,

That's my expectation, otherwise we keep rebasing the refactor PR and never get 
it done.

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon 
mailto:gurwls...@gmail.com>> wrote:
BTW, do we hold Datasource V2 related PRs for now until we finish this 
refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue 님이 작성:
Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it 
something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that 
carries the commit and abort methods, then we can create it directly without a 
WriteConfig. So I tentatively agree with what you propose, assuming that I 
understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
I'm switching to my another Gmail account, let's see if it still gets dropped 
this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the same.

We still have l

Re: [Discuss] Datasource v2 support for manipulating partitions

2018-09-18 Thread Thakrar, Jayesh
Totally agree with you Dale, that there are situations for efficiency, 
performance and better control/visibility/manageability that we need to expose 
partition management.

So as described, I suggested two things - the ability to do it in the current 
V2 API form via options and appropriate implementation in datasource 
reader/writer.

And for long term, suggested that partition management can be made part of 
metadata/catalog management - SPARK-24252 (DataSourceV2: Add catalog support)?


On 9/17/18, 8:26 PM, "tigerquoll"  wrote:

Hi Jayesh,
I get where you are coming from - partitions are just an implementation
optimisation that we really shouldn’t be bothering the end user with. 
Unfortunately that view is like saying RPC is like a procedure call, and
details of the network transport should be hidden from the end user. CORBA
tried this approach for RPC and failed for the same reason that no major
vendor of DBMS systems that support partitions try to hide them from the end
user.  They have a substantial real world effect that is impossible to hide
from the user (in particular when writing/modifying the data source).  Any
attempt to “take care” of partitions automatically invariably guesses wrong
and ends up frustrating the end user (as “substantial real world effect”
turns to “show stopping performance penalty” if the user attempts to fight
against a partitioning scheme she has no idea exists)

So if we are not hiding them from the user, we need to allow users to
manipulate them. Either by representing them generically in the API,
allowing pass-through commands to manipulate them, or by some other means.

Regards,
Dale.




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/





Re: [Discuss] Datasource v2 support for manipulating partitions

2018-09-16 Thread Thakrar, Jayesh
I am not involved with the design or development of the V2 API - so these could 
be naïve comments/thoughts.
Just as dataset is to abstract away from RDD, which otherwise required a little 
more intimate knowledge about Spark internals, I am guessing the absence of 
partition operations is either due to no current need for it or the need to 
abstract that away from the API user/programmer.
Ofcourse, the other thing about dataset is that it comes with schema - closely 
binding the two together.

However for situations where a deeper understanding of the datasource is 
necessary to read/write data, then that logic can potentially be embedded into 
the classes that implement DataSourceReader/DataSourceWriter or 
DataReader/DataWriter.

E.g. if you are writing data and you need to dynamically create partitions on 
the fly as you write data, then the DataSourceReader can gather the current 
list of partitions and pass it on to DataWriter via DataWriterFactory. The 
DataWriter can consult that list and if a row is encountered that does not 
exist then create it (again note that the partition creation operation needs to 
be idempotent OR the DataWriter needs to check for the partition before trying 
to create as it may have been already created by another DataWriter).

As for partition add/list/drop/alter, I don't think that concept/notion applies 
to all datasources (e.g. filesystem).
Also, the concept of a Spark partition may not translate into the underlying 
datasource partition.

At the same time I did see a discussion thread on catalog operations for V2 API 
- although, probably Spark partitions do not map one-to-one to the underlying 
partitions.

Probably a good place to introduce partition info is to add a method/object 
called "meta" to a dataset and allow the datasource to describe itself (e.g. 
table permissions, table partitions and specs, datasource info (e.g. cluster), 
etc.).

E.g. something like this

With just meta method
dataset.meta = {optional datasource specific info}

Or with meta as an intermediate object with several operations
dataset.meta.describe
dataset.meta.update


However, if you are look 

On 9/16/18, 1:24 AM, "tigerquoll"  wrote:

I've been following the development of the new data source abstraction with
keen interest.  One of the issues that has occurred to me as I sat down and
planned how I would implement a data source is how I would support
manipulating partitions.

My reading of the current prototype is that Data source v2 APIs expose
enough of a concept of a partition to support communicating record
distribution particulars to catalyst, but does not represent partitions as a
concept that the end user of the data sources can manipulate.

The end users of data sources need to be able to add/drop/modify and list
partitions. For example, many systems require partitions to be created
before records are added to them.  

For batch use-cases, it may be possible for users to manipulate partitions
from within the environment that the data source interfaces to, but for
streaming use-cases, this is not at all practical.

Two ways I can think of doing this are:
1. Allow "pass-through" commands to the underlying data source
2. Have a generic concept of partitions exposed to the end user via the data
source API and Spark SQL DML.

I'm keen for option 2 but recognise that its possible there are better
alternatives out there.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/





Re: DataSourceWriter V2 Api questions

2018-09-13 Thread Thakrar, Jayesh
Agree on the “constraints” when working with Cassandra.
But remember, this is a weak attempt to make two non-transactional systems 
appear to the outside world as a transactional system.
Scaffolding/plumbing/abstractions will have to be created in the form of say, a 
custom data access layer.

Anyway, Ross is trying to get some practices used by other adopters of the V2 
API while trying to implement a driver/connector for MongoDB.

Probably views can be used similar to partitions in mongoDB?
Essentially each batch load goes into a separate mongoDB table and will result 
in view redefinition after a successful load.
And finally to avoid too many tables in a view, you may have to come up with a 
separate process to merge the underlying tables on a periodic basis.
It gets messy and probably moves you towards a write-once only tables, etc.

Finally using views in a generic mongoDB connector may not be good and flexible 
enough.


From: Russell Spitzer 
Date: Tuesday, September 11, 2018 at 9:58 AM
To: "Thakrar, Jayesh" 
Cc: Arun Mahadevan , Jungtaek Lim , 
Wenchen Fan , Reynold Xin , Ross 
Lawley , Ryan Blue , dev 
, "dbis...@us.ibm.com" 
Subject: Re: DataSourceWriter V2 Api questions

That only works assuming that Spark is the only client of the table. It will be 
impossible to force an outside user to respect the special metadata table when 
reading so they will still see all of the data in transit. Additionally this 
would force the incoming data to only be written into new partitions which is 
not simple to do from a C* perspective as balancing the distribution of new 
rows would be non trivial. If we had to do something like this we would 
basically be forced to write to some disk format first and then when we move 
the data into C* we still have the same problem that we started with.

On Tue, Sep 11, 2018 at 9:41 AM Thakrar, Jayesh 
mailto:jthak...@conversantmedia.com>> wrote:
So if Spark and the destination datastore are both non-transactional, you will 
have to resort to an external mechanism for “transactionality”.

Here are some options for both RDBMS and non-transaction datastore destination.
For now assuming that Spark is used in batch mode (and not streaming mode).

RDBMS Options
Use staging table as discussed in the thread.

As an extension of the above, use partitioned destination tables and load data 
into a staging table and then use partition management to include the staging 
table into the partitioned table.
This this implies a partition per Spark batch run.

Non-transactional Datastore Options
Use another metadata table.
Load the data into a staging table equivalent or even Cassandra partition(s).
Start the transaction by making a “start of transaction” entry into the 
metadata table along with partition keys to be populated.
As part of Spark batch commit, update the metadata entry with appropriate 
details – e.g. partition load time, etc.
In the event of a failed / incomplete batch, the metadata table entry will be 
incomplete and the corresponding partition keys can be dropped.
So essentially you use the metadata table to load/drop/skip the data to be 
moved/retained into the final destination.

Misc
Another option is to use Spark to stage data into a filesystem (distributed, 
HDFS) and then use RDBMS utilities to transactionally load data into the 
destination table.


From: Russell Spitzer 
mailto:russell.spit...@gmail.com>>
Date: Tuesday, September 11, 2018 at 9:08 AM
To: Arun Mahadevan mailto:ar...@apache.org>>
Cc: Jungtaek Lim mailto:kabh...@gmail.com>>, Wenchen Fan 
mailto:cloud0...@gmail.com>>, Reynold Xin 
mailto:r...@databricks.com>>, Ross Lawley 
mailto:ross.law...@gmail.com>>, Ryan Blue 
mailto:rb...@netflix.com>>, dev 
mailto:dev@spark.apache.org>>, 
mailto:dbis...@us.ibm.com>>

Subject: Re: DataSourceWriter V2 Api questions

I'm still not sure how the staging table helps for databases which do not have 
such atomicity guarantees. For example in Cassandra if you wrote all of the 
data temporarily to a staging table, we would still have the same problem in 
moving the data from the staging table into the real table. We would likely 
have as similar a chance of failing and we still have no way of making the 
entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
>Some being said it is exactly-once when the output is eventually exactly-once, 
>whereas others being said there should be no side effect, like consumer 
>shouldn't see partial write. I guess 2PC is former, since some partitions can 
>commit earlier while other partitions fail to commit for some time.
Yes its more about guaranteeing atomicity like all partitions eventually commit 
or none commits. The visibility of the data for the readers is orthogonal (e.g 
setting the isolation levels like serializable for XA) and in general its 
difficult to guar

Re: DataSourceWriter V2 Api questions

2018-09-11 Thread Thakrar, Jayesh
So if Spark and the destination datastore are both non-transactional, you will 
have to resort to an external mechanism for “transactionality”.

Here are some options for both RDBMS and non-transaction datastore destination.
For now assuming that Spark is used in batch mode (and not streaming mode).

RDBMS Options
Use staging table as discussed in the thread.

As an extension of the above, use partitioned destination tables and load data 
into a staging table and then use partition management to include the staging 
table into the partitioned table.
This this implies a partition per Spark batch run.

Non-transactional Datastore Options
Use another metadata table.
Load the data into a staging table equivalent or even Cassandra partition(s).
Start the transaction by making a “start of transaction” entry into the 
metadata table along with partition keys to be populated.
As part of Spark batch commit, update the metadata entry with appropriate 
details – e.g. partition load time, etc.
In the event of a failed / incomplete batch, the metadata table entry will be 
incomplete and the corresponding partition keys can be dropped.
So essentially you use the metadata table to load/drop/skip the data to be 
moved/retained into the final destination.

Misc
Another option is to use Spark to stage data into a filesystem (distributed, 
HDFS) and then use RDBMS utilities to transactionally load data into the 
destination table.


From: Russell Spitzer 
Date: Tuesday, September 11, 2018 at 9:08 AM
To: Arun Mahadevan 
Cc: Jungtaek Lim , Wenchen Fan , 
Reynold Xin , Ross Lawley , Ryan 
Blue , dev , 
Subject: Re: DataSourceWriter V2 Api questions

I'm still not sure how the staging table helps for databases which do not have 
such atomicity guarantees. For example in Cassandra if you wrote all of the 
data temporarily to a staging table, we would still have the same problem in 
moving the data from the staging table into the real table. We would likely 
have as similar a chance of failing and we still have no way of making the 
entire staging set simultaneously visible.

On Tue, Sep 11, 2018 at 8:39 AM Arun Mahadevan 
mailto:ar...@apache.org>> wrote:
>Some being said it is exactly-once when the output is eventually exactly-once, 
>whereas others being said there should be no side effect, like consumer 
>shouldn't see partial write. I guess 2PC is former, since some partitions can 
>commit earlier while other partitions fail to commit for some time.
Yes its more about guaranteeing atomicity like all partitions eventually commit 
or none commits. The visibility of the data for the readers is orthogonal (e.g 
setting the isolation levels like serializable for XA) and in general its 
difficult to guarantee that data across partitions are visible at once. The 
approach like staging table and global commit works in a centralized set up but 
can be difficult to do in a distributed manner across partitions (e.g each 
partition output goes to a different database)

On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim 
mailto:kabh...@gmail.com>> wrote:
IMHO that's up to how we would like to be strict about "exactly-once".

Some being said it is exactly-once when the output is eventually exactly-once, 
whereas others being said there should be no side effect, like consumer 
shouldn't see partial write. I guess 2PC is former, since some partitions can 
commit earlier while other partitions fail to commit for some time.

Being said, there may be couple of alternatives other than the contract Spark 
provides/requires, and I'd like to see how Spark community wants to deal with 
others. Would we want to disallow alternatives, like "replay + deduplicate 
write (per a batch/partition)" which ensures "eventually" exactly-once but 
cannot ensure the contract?

Btw, unless achieving exactly-once is light enough for given sink, I think the 
sink should provide both at-least-once (also optimized for the semantic) vs 
exactly-once, and let end users pick one.

2018년 9월 11일 (화) 오후 12:57, Russell Spitzer 
mailto:russell.spit...@gmail.com>>님이 작성:
Why is atomic operations a requirement? I feel like doubling the amount of 
writes (with staging tables) is probably a tradeoff that the end user should 
make.
On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
Regardless the API, to use Spark to write data atomically, it requires
1. Write data distributedly, with a central coordinator at Spark driver.
2. The distributed writers are not guaranteed to run together at the same time. 
(This can be relaxed if we can extend the barrier scheduling feature)
3. The new data is visible if and only if all distributed writers success.

According to these requirements, I think using a staging table is the most 
common way and maybe the only way. I'm not sure how 2PC can help, we don't want 
users to read partial data, so we need a final step to commit all the data 
together.

For RDBMS data sources, I think a simple solution is to ask users to coalesce 

Re: data source api v2 refactoring

2018-09-07 Thread Thakrar, Jayesh
Ryan et al,

Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) have 
been ported to V2.
I remember reading threads where there were discussions about the 
inefficiency/overhead of converting from Row to InternalRow that was preventing 
certain porting effort etc.

I ask because those are the most widely used data sources and have a lot of 
effort and thinking behind them, and if they have ported over to V2, then they 
can serve as excellent production examples of V2 API.

Thanks,
Jayesh

From: Ryan Blue 
Reply-To: 
Date: Friday, September 7, 2018 at 2:19 PM
To: Wenchen Fan 
Cc: Hyukjin Kwon , Spark Dev List 
Subject: Re: data source api v2 refactoring

There are a few v2-related changes that we can work in parallel, at least for 
reviews:

* SPARK-25006, #21978: Add catalog 
to TableIdentifier - this proposes how to incrementally add multi-catalog 
support without breaking existing code paths
* SPARK-24253, #21308: Add 
DeleteSupport API - this is a small API addition, which doesn't affect the 
refactor
* SPARK-24252, #21306: Add v2 
Catalog API - this is a different way to create v2 tables, also doesn't affect 
the refactor

I agree that the PR for adding SQL support should probably wait on the 
refactor. I have also been meaning to share our implementation, which isn't 
based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and 
AlterTable from both SQL and the other methods in the DF API, saveAsTable and 
insertInto. It follows the structure that I proposed on the SQL support PR to 
convert SQL plans to v2 plans and uses the new TableCatalog to implement CTAS 
and RTAS.

rb


On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
Hi Ryan,

You are right that the `LogicalWrite` mirrors the read side API. I just don't 
have a good naming yet, and write side changes will be a different PR.


Hi Hyukjin,

That's my expectation, otherwise we keep rebasing the refactor PR and never get 
it done.

On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon 
mailto:gurwls...@gmail.com>> wrote:
BTW, do we hold Datasource V2 related PRs for now until we finish this 
refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue 님이 작성:
Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it 
something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that 
carries the commit and abort methods, then we can create it directly without a 
WriteConfig. So I tentatively agree with what you propose, assuming that I 
understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan 
mailto:cloud0...@gmail.com>> wrote:
I'm switching to my another Gmail account, let's see if it still gets dropped 
this time.

Hi Ryan,

I'm thinking about the write path and feel the abstraction should be the same.

We still have logical and physical writing. And the table can create different 
logical writing based on how to write. e.g., append, delete, replaceWhere, etc.

One thing I'm not sure about is the WriteConfig. With the WriteConfig, the API 
would look like
trait Table {
  WriteConfig newAppendWriteConfig();

  WriteConfig newDeleteWriteConfig(deleteExprs);

  LogicalWrite newLogicalWrite(writeConfig);
}

Without WriteConfig, the API looks like
trait Table {
  LogicalWrite newAppendWrite();

  LogicalWrite newDeleteWrite(deleteExprs);
}


It looks to me that the API is simpler without WriteConfig, what do you think?

Thanks,
Wenchen

On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue  wrote:
Latest from Wenchen in case it was dropped.
-- Forwarded message -
From: Wenchen Fan mailto:wenc...@databricks.com>>
Date: Mon, Sep 3, 2018 at 6:16 AM
Subject: Re: data source api v2 refactoring
To: mailto:mri...@gmail.com>>
Cc: Ryan Blue mailto:rb...@netflix.com>>, Reynold Xin 
mailto:r...@databricks.com>>, 
mailto:dev@spark.apache.org>>

Hi Mridul,

I'm not sure what's going on, my email was CC'ed to the dev list.


Hi Ryan,

The logical and physical scan idea sounds good. To add more color to Jungtaek's 
question, both micro-batch and continuous mode have the logical and physical 
scan, but there is a difference: for micro-batch mode, a physical scan outputs 
data for one epoch, but it's not true for continuous mode.

I'm not sure if it's necessary to include streaming epoch in the API 
abstraction, for features like metrics reporting.

On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan 
mailto:mri...@gmail.com>> wrote:

Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did 
:-) )
I did not see it in the mail thread I received or in archives ... [1] Wondering 
which othersenderswere getting dropped (if yes).

Regards
Mridul

[1] 

Re: Creating JDBC source table schema(DDL) dynamically

2018-07-12 Thread Thakrar, Jayesh
Unless the tables are very small (< 1000 rows), the impact of hitting the 
catalog tables is negligible.
Furthermore, normally the catalog tables (or views) are usually in memory 
because they are needed for query compilation, query execution (for triggers, 
referential integrity, etc) and even to establish a connection.

On 7/12/18, 9:53 AM, "Kadam, Gangadhar (GE Aviation, Non-GE)" 
 wrote:

Thanks Jayesh.

I was aware of the catalog table approach but I was avoiding that  because 
I will hit the database twice for one table, one to create DDL and other to 
read the data. I have lots of table to transport from one environment to other 
and I don’t want to create unnecessary load on the DB.


On 7/12/18, 10:09 AM, "Thakrar, Jayesh"  
wrote:

One option is to use plain JDBC to interrogate Postgresql catalog for 
the source table and generate the DDL to create the destination table.
Then using plain JDBC again, create the table at the destination.

See the link below for some pointers…..


https://stackoverflow.com/questions/2593803/how-to-generate-the-create-table-sql-statement-for-an-existing-table-in-postgr


On 7/11/18, 9:55 PM, "Kadam, Gangadhar (GE Aviation, Non-GE)" 
 wrote:

Hi All,

I am trying to build a spark application which will  read the data 
from Postgresql (source)  one environment  and write it to  postgreSQL, Aurora 
(target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) 
using spark JDBC.

When I am loading the dataframe back to target DB, I would like to 
ensure the same schema as the source table schema using

val targetTableSchema: String =
  """
|  operating_unit_nm character varying(20),
|  organization_id integer,
|  organization_cd character varying(30),
|  requesting_organization_id integer,
|  requesting_organization_cd character varying(50),
|  owning_organization_id integer,
|  owning_organization_cd character varying(50)
""".stripMargin


.option("createTableColumnTypes", targetTableSchema )

I would like to know if there is way I can create this 
targetTableSchema (source table DDL) variable directly from source table or 
from a csv file. I don’t want spark to enforce its default schema.  Based on 
the table name, How do I  get the DDL created dynamically to pass it to 
targetTableSchema variable as a string.

Currently I am updating targetTableSchema manually  and looking for 
some pointer to automate it.


Below is my code

// Define the parameter
val sourceDb: String = args(0)
val targetDb: String = args(1)
val sourceTable: String = args(2)
val targetTable: String = args(3)
val sourceEnv: String = args(4)
val targetEnv: String = args(5)

println("Arguments Provided: " + sourceDb, targetDb,sourceTable, 
targetTable, sourceEnv, targetEnv)

// Define the spark session
val spark: SparkSession = SparkSession
  .builder()
  .appName("Ca-Data-Transporter")
  .master("local")
  .config("driver", "org.postgresql.Driver")
  .getOrCreate()

// define the input directory
val inputDir: String = 
"/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"

// Define the source DB properties
val sourceParmFile: String = if (sourceDb == "RDS") {
"rds-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "AURORA") {
"aws-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "GP") {
"gp-db-parms-" + sourceEnv + ".txt"
  }
  else "NA"

println(sourceParmFile)

val sourceDbParms: Properties = new Properties()
sourceDbParms.load(new FileInputStream(new File(inputDir + 
sourceParmFile)))
val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")

println(s"$sourceDb")
println(s"$sourceDbJdbcUrl")

// Define the target DB properties
val targetParmFile: String = if (targetDb == "RDS") {

Re: Creating JDBC source table schema(DDL) dynamically

2018-07-12 Thread Thakrar, Jayesh
One option is to use plain JDBC to interrogate Postgresql catalog for the 
source table and generate the DDL to create the destination table.
Then using plain JDBC again, create the table at the destination.

See the link below for some pointers…..

https://stackoverflow.com/questions/2593803/how-to-generate-the-create-table-sql-statement-for-an-existing-table-in-postgr


On 7/11/18, 9:55 PM, "Kadam, Gangadhar (GE Aviation, Non-GE)" 
 wrote:

Hi All,

I am trying to build a spark application which will  read the data from 
Postgresql (source)  one environment  and write it to  postgreSQL, Aurora 
(target)  on a dfiffernt environment  (like to PROD to QA or QA to PROD etc) 
using spark JDBC.

When I am loading the dataframe back to target DB, I would like to ensure 
the same schema as the source table schema using

val targetTableSchema: String =
  """
|  operating_unit_nm character varying(20),
|  organization_id integer,
|  organization_cd character varying(30),
|  requesting_organization_id integer,
|  requesting_organization_cd character varying(50),
|  owning_organization_id integer,
|  owning_organization_cd character varying(50)
""".stripMargin


.option("createTableColumnTypes", targetTableSchema )

I would like to know if there is way I can create this targetTableSchema 
(source table DDL) variable directly from source table or from a csv file. I 
don’t want spark to enforce its default schema.  Based on the table name, How 
do I  get the DDL created dynamically to pass it to targetTableSchema variable 
as a string.

Currently I am updating targetTableSchema manually  and looking for some 
pointer to automate it.


Below is my code

// Define the parameter
val sourceDb: String = args(0)
val targetDb: String = args(1)
val sourceTable: String = args(2)
val targetTable: String = args(3)
val sourceEnv: String = args(4)
val targetEnv: String = args(5)

println("Arguments Provided: " + sourceDb, targetDb,sourceTable, 
targetTable, sourceEnv, targetEnv)

// Define the spark session
val spark: SparkSession = SparkSession
  .builder()
  .appName("Ca-Data-Transporter")
  .master("local")
  .config("driver", "org.postgresql.Driver")
  .getOrCreate()

// define the input directory
val inputDir: String = 
"/Users/gangadharkadam/projects/ca-spark-apps/src/main/resources/"

// Define the source DB properties
val sourceParmFile: String = if (sourceDb == "RDS") {
"rds-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "AURORA") {
"aws-db-parms-" + sourceEnv + ".txt"
  }
  else if (sourceDb == "GP") {
"gp-db-parms-" + sourceEnv + ".txt"
  }
  else "NA"

println(sourceParmFile)

val sourceDbParms: Properties = new Properties()
sourceDbParms.load(new FileInputStream(new File(inputDir + sourceParmFile)))
val sourceDbJdbcUrl: String = sourceDbParms.getProperty("jdbcUrl")

println(s"$sourceDb")
println(s"$sourceDbJdbcUrl")

// Define the target DB properties
val targetParmFile: String = if (targetDb == "RDS") {
s"rds-db-parms-" + targetEnv + ".txt"
  }
  else if (targetDb == "AURORA") {
s"aws-db-parms-" + targetEnv + ".txt"
  }
  else if (targetDb == "GP") {
s"gp-db-parms-" + targetEnv + ".txt"
  } else "aws-db-parms-$targetEnv.txt"

println(targetParmFile)

val targetDbParms: Properties = new Properties()
targetDbParms.load(new FileInputStream(new File(inputDir + targetParmFile)))
val targetDbJdbcUrl: String = targetDbParms.getProperty("jdbcUrl")

println(s"$targetDb")
println(s"$targetDbJdbcUrl")

// Read the source table as dataFrame
val sourceDF: DataFrame = spark
  .read
  .jdbc(url = sourceDbJdbcUrl,
table = sourceTable,
sourceDbParms
  )
  //.filter("site_code is not null")

sourceDF.printSchema()
sourceDF.show()

val sourceDF1 = sourceDF.repartition(
  sourceDF("organization_id")
  //sourceDF("plan_id")
)


val targetTableSchema: String =
  """
|  operating_unit_nm character varying(20),
|  organization_id integer,
|  organization_cd character varying(30),
|  requesting_organization_id integer,
|  requesting_organization_cd character varying(50),
|  owning_organization_id integer,
|  owning_organization_cd character varying(50)
  """.stripMargin


// write the dataFrame
sourceDF1
  .write
  .option("createTableColumnTypes", targetTableSchema )
  .mode(saveMode = "Overwrite")
  .option("truncate", "true")
  .jdbc(targetDbJdbcUrl, targetTable, targetDbParms)


Thanks!
Gangadhar Kadam
Sr. Data Engineer

Re: Datasource API V2 and checkpointing

2018-05-01 Thread Thakrar, Jayesh
Just wondering-

Given that currently V2 is less performant because of use of Row vs InternalRow 
(and other things?), is still evolving, and is missing some of the other 
features of V1, it might help to focus on remediating those features and then 
look at porting the filesources over.

As for the escape hatch (or additional capabilities), can that be implemented 
as traits?

And imho, i think filesources and other core sources should have the same 
citizenship level as us granted to the other sources in V2. This is so that 
others can use then as good references for emulation.

Jayesh


From: Joseph Torres <joseph.tor...@databricks.com>
Sent: Tuesday, May 1, 2018 1:58:54 PM
To: Ryan Blue
Cc: Thakrar, Jayesh; dev@spark.apache.org
Subject: Re: Datasource API V2 and checkpointing

I agree that Spark should fully handle state serialization and recovery for 
most sources. This is how it works in V1, and we definitely wouldn't want or 
need to change that in V2.* The question is just whether we should have an 
escape hatch for the sources that don't want Spark to do that, and if so what 
the escape hatch should look like.

I don't think a watermark checkpoint would work, because there's no guarantee 
(especially considering the "maxFilesPerTrigger" option) that all files with 
the same timestamp will be in the same batch. But in general, hanging the 
fundamental mechanics of how file sources take checkpoints seems like it would 
impose a serious risk of performance regressions, which I don't think are a 
desirable risk when performing an API migration that's going to swap out users' 
queries from under them. I would be very uncomfortable merging a V2 file source 
which we can't confidently assert has the same performance characteristics as 
the existing one.


* Technically, most current sources do write their initial offset to the 
checkpoint directory, but this is just a workaround to the fact that the V1 API 
has no handle to give Spark the initial offset. So if you e.g. start a Kafka 
stream from latest offsets, and it fails in the first batch, Spark won't know 
to restart the stream from the initial offset which was originally generated. 
That's easily fixable in V2, and then no source will have to even look at the 
checkpoint directory if it doesn't want to.

On Tue, May 1, 2018 at 10:26 AM, Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:
I think there's a difference. You're right that we wanted to clean up the API 
in V2 to avoid file sources using side channels. But there's a big difference 
between adding, for example, a way to report partitioning and designing for 
sources that need unbounded state. It's a judgment call, but I think unbounded 
state is definitely not something that we should design around. Another way to 
think about it: yes, we want to design a better API using existing sources as 
guides, but we don't need to assume that everything those sources do should to 
be supported. It is reasonable to say that this is a case we don't want to 
design for and the source needs to change. Why can't we use a high watermark of 
files' modified timestamps?

For most sources, I think Spark should handle state serialization and recovery. 
Maybe we can find a good way to make the file source with unbounded state work, 
but this shouldn't be one of the driving cases for the design and consequently 
a reason for every source to need to manage its own state in a checkpoint 
directory.

rb

On Mon, Apr 30, 2018 at 12:37 PM, Joseph Torres 
<joseph.tor...@databricks.com<mailto:joseph.tor...@databricks.com>> wrote:
I'd argue that letting bad cases influence the design is an explicit goal of 
DataSourceV2. One of the primary motivations for the project was that file 
sources hook into a series of weird internal side channels, with favorable 
performance characteristics that are difficult to match in the API we actually 
declare to Spark users. So a design that we can't migrate file sources to 
without a side channel would be worrying; won't we end up regressing to the 
same situation?

On Mon, Apr 30, 2018 at 11:59 AM, Ryan Blue 
<rb...@netflix.com<mailto:rb...@netflix.com>> wrote:
Should we really plan the API for a source with state that grows indefinitely? 
It sounds like we're letting a bad case influence the design, when we probably 
shouldn't.

On Mon, Apr 30, 2018 at 11:05 AM, Joseph Torres 
<joseph.tor...@databricks.com<mailto:joseph.tor...@databricks.com>> wrote:
Offset is just a type alias for arbitrary JSON-serializable state. Most 
implementations should (and do) just toss the blob at Spark and let Spark 
handle recovery on its own.

In the case of file streams, the obstacle is that the conceptual offset is very 
large: a list of every file which the stream has ever read. In order to parse 
this efficiently, the stream connector needs detailed control over how it's 
stor

Re: Datasource API V2 and checkpointing

2018-04-27 Thread Thakrar, Jayesh
Thanks Joseph!

From: Joseph Torres <joseph.tor...@databricks.com>
Date: Friday, April 27, 2018 at 11:23 AM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>
Subject: Re: Datasource API V2 and checkpointing

The precise interactions with the DataSourceV2 API haven't yet been hammered 
out in design. But much of this comes down to the core of Structured Streaming 
rather than the API details.

The execution engine handles checkpointing and recovery. It asks the streaming 
data source for offsets, and then determines that batch N contains the data 
between offset A and offset B. On recovery, if batch N needs to be re-run, the 
execution engine just asks the source for the same offset range again. Sources 
also get a handle to their own subfolder of the checkpoint, which they can use 
as scratch space if they need. For example, Spark's FileStreamReader keeps a 
log of all the files it's seen, so its offsets can be simply indices into the 
log rather than huge strings containing all the paths.

SPARK-23323 is orthogonal. That commit coordinator is responsible for ensuring 
that, within a single Spark job, two different tasks can't commit the same 
partition.

On Fri, Apr 27, 2018 at 8:53 AM, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Wondering if this issue is related to SPARK-23323?

Any pointers will be greatly appreciated….

Thanks,
Jayesh

From: "Thakrar, Jayesh" 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>>
Date: Monday, April 23, 2018 at 9:49 PM
To: "dev@spark.apache.org<mailto:dev@spark.apache.org>" 
<dev@spark.apache.org<mailto:dev@spark.apache.org>>
Subject: Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the 
checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh




Re: Datasource API V2 and checkpointing

2018-04-27 Thread Thakrar, Jayesh
Wondering if this issue is related to SPARK-23323?

Any pointers will be greatly appreciated….

Thanks,
Jayesh

From: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Date: Monday, April 23, 2018 at 9:49 PM
To: "dev@spark.apache.org" <dev@spark.apache.org>
Subject: Datasource API V2 and checkpointing

I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the 
checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh



Datasource API V2 and checkpointing

2018-04-23 Thread Thakrar, Jayesh
I was wondering when checkpointing is enabled, who does the actual work?
The streaming datasource or the execution engine/driver?

I have written a small/trivial datasource that just generates strings.
After enabling checkpointing, I do see a folder being created under the 
checkpoint folder, but there's nothing else in there.

Same question for write-ahead and recovery?
And on a restart from a failed streaming session - who should set the offsets?
The driver/Spark or the datasource?

Any pointers to design docs would also be greatly appreciated.

Thanks,
Jayesh



Re: V2.3 Scala API to Github Links Incorrect

2018-04-15 Thread Thakrar, Jayesh
Thanks Sameer!

From: Sameer Agarwal <samee...@apache.org>
Date: Sunday, April 15, 2018 at 10:02 PM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>, Hyukjin Kwon 
<gurwls...@gmail.com>
Subject: Re: V2.3 Scala API to Github Links Incorrect

[+Hyukjin]

Thanks for flagging this Jayesh. 
https://github.com/apache/spark-website/pull/111 is tracking a short term fix 
to the API docs and https://issues.apache.org/jira/browse/SPARK-23732 tracks 
the fix to the release scripts.

Regards,
Sameer


On 15 April 2018 at 18:50, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
In browsing through the API docs, the links to Github source code seem to be 
pointing to a dev branch rather than the release branch.

Here's one example
Go to the API doc page below and click on the "ProcessingTime.scala" link which 
points to Sameer's dev branch.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.ProcessingTime

https://github.com/apache/spark/tree/v2.3.0/Users/sameera/dev/spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala

Any chance this can be corrected please?

BTW, I know working and executing on a release is an arduous task, so thanks 
for all the effort, Sameer and the dev/release team and contributors!

Thanks,
Jayesh




V2.3 Scala API to Github Links Incorrect

2018-04-15 Thread Thakrar, Jayesh
In browsing through the API docs, the links to Github source code seem to be 
pointing to a dev branch rather than the release branch.

Here's one example
Go to the API doc page below and click on the "ProcessingTime.scala" link which 
points to Sameer's dev branch.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.ProcessingTime

https://github.com/apache/spark/tree/v2.3.0/Users/sameera/dev/spark/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala

Any chance this can be corrected please?

BTW, I know working and executing on a release is an arduous task, so thanks 
for all the effort, Sameer and the dev/release team and contributors!

Thanks,
Jayesh



Spark 2.3 V2 Datasource API questions

2018-04-06 Thread Thakrar, Jayesh
First of all thank you to the Spark dev team for coming up with the 
standardized and intuitive API interfaces.
I am sure it will encourage integrating a lot more new datasource integration.

I have been creating playing with the API and have some questions on the 
continuous streaming API
(see https://github.com/JThakrar/sparkconn#continuous-streaming-datasource )

It seems that "commit" is never called

query.status always shows the message below even after the query has been 
initialized, data has been streaming:
{
  "message" : "Initializing sources",
  "isDataAvailable" : false,
  "isTriggerActive" : true
}


query.recentProgress always shows an empty array:

Array[org.apache.spark.sql.streaming.StreamingQueryProgress] = Array()

And stopping a query always shows as if the tasks were lost involuntarily or 
uncleanly (even though close on the datasource was called) :
2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 2.0 in stage 1.0 (TID 
7, localhost, executor driver): TaskKilled (Stage cancelled)
2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 1.0 in stage 1.0 (TID 
6, localhost, executor driver): TaskKilled (Stage cancelled)
2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 3.0 in stage 1.0 (TID 
8, localhost, executor driver): TaskKilled (Stage cancelled)
2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 0.0 in stage 1.0 (TID 
5, localhost, executor driver): TaskKilled (Stage cancelled)
2018-04-06 08:07:10 WARN  TaskSetManager:66 - Lost task 4.0 in stage 1.0 (TID 
9, localhost, executor driver): TaskKilled (Stage cancelled)

Any pointers/info will be greatly appreciated.





Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

2018-03-22 Thread Thakrar, Jayesh
Thanks Wenchen, - yes, I did refer to the Spark inbuilt sources as mentioned 
earlier and have been using the Kafka streaming as a reference example.
The builtin ones work and use the internalCreateDataFrame - and that's where I 
got the idea about using the method to set the "isStreaming" to true.

But I should confess that I don't know the source code very well, so will 
appreciate if you can point me to any other pointers/examples please.

From: Wenchen Fan <cloud0...@gmail.com>
Date: Thursday, March 22, 2018 at 2:52 PM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: "rb...@netflix.com" <rb...@netflix.com>, "dev@spark.apache.org" 
<dev@spark.apache.org>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming 
beyond sql package?

org.apache.spark.sql.execution.streaming.Source is for internal use only. The 
official stream data source API is the data source v2 API. You can take a look 
at the Spark built-in streaming data sources as examples. Note: data source v2 
is still experimental, you may need to update your code in a new Spark release 
:)

On Thu, Mar 22, 2018 at 12:43 PM, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Hi Ryan,

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on 
it.

So creating custom batch/non-streaming data source is not difficult.

The issue I have is when a streaming data source.

Similar to batch source, you need to implement a simple trait - 
org.apache.spark.sql.execution.streaming.Source (example below).
The "getBatch" expects a dataframe and that dataframe needs to have one of its 
attributes "isStreaming" to be set to true.
However that is not exposed during dataframe creation and the only way to do it 
is to make your package/class a child of org.apache.spark.sql

As I write this, I think having my code on github will make it easy to 
illustrate.

See a Spark Jira comment that illustrates the same problem for Spark packaged 
streaming source

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919


class MyDataStreamSource(sqlContext: SQLContext,
 override val schema: StructType,
 numPartitions: Int,
 numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = 
System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val batchStreamTime = System.currentTimeMillis() // 
end.asInstanceOf[MyDataStreamOffset].value
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, 
batchStreamTime, numPartitions, numRowsPerPartition)
val internalRow = rdd.map(row => 
InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}



From: Ryan Blue <rb...@netflix.com<mailto:rb...@netflix.com>>
Reply-To: "rb...@netflix.com<mailto:rb...@netflix.com>" 
<rb...@netflix.com<mailto:rb...@netflix.com>>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>>
Cc: "dev@spark.apache.org<mailto:dev@spark.apache.org>" 
<dev@spark.apache.org<mailto:dev@spark.apache.org>>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming 
beyond sql package?

Jayesh,

We're working on a new API for building sources, DataSourceV2. That API allows 
you to produce UnsafeRow and we are very likely going to change that to 
InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 
release if you'd like to try it out.

Here's an example implementation from the Iceberg table format: 
https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java

rb

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Because these are not exposed in the usual API, its not possible (or difficult) 
to create custom structured streaming sources.

Consequently, one has to create streaming sources in packages under 
org.apache.spark.sql.

Any pointers or info is greatly appreciated.



--
Ryan Blue
Software Engineer
Netflix



Re: Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

2018-03-22 Thread Thakrar, Jayesh
Hi Ryan,

Thanks for the quick reply - I like the Iceberg approach, will keep an eye on 
it.

So creating custom batch/non-streaming data source is not difficult.

The issue I have is when a streaming data source.

Similar to batch source, you need to implement a simple trait - 
org.apache.spark.sql.execution.streaming.Source (example below).
The "getBatch" expects a dataframe and that dataframe needs to have one of its 
attributes "isStreaming" to be set to true.
However that is not exposed during dataframe creation and the only way to do it 
is to make your package/class a child of org.apache.spark.sql

As I write this, I think having my code on github will make it easy to 
illustrate.

See a Spark Jira comment that illustrates the same problem for Spark packaged 
streaming source

https://issues.apache.org/jira/browse/SPARK-21765?focusedCommentId=16142919=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16142919


class MyDataStreamSource(sqlContext: SQLContext,
 override val schema: StructType,
 numPartitions: Int,
 numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = 
System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val batchStreamTime = System.currentTimeMillis() // 
end.asInstanceOf[MyDataStreamOffset].value
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, 
batchStreamTime, numPartitions, numRowsPerPartition)
val internalRow = rdd.map(row => 
InternalRow(UTF8String.fromString(row.get(0).asInstanceOf[String])))
sqlContext.internalCreateDataFrame(internalRow, schema, isStreaming = true)
  }

}



From: Ryan Blue <rb...@netflix.com>
Reply-To: "rb...@netflix.com" <rb...@netflix.com>
Date: Thursday, March 22, 2018 at 1:45 PM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>
Subject: Re: Any reason for not exposing internalCreateDataFrame or isStreaming 
beyond sql package?

Jayesh,

We're working on a new API for building sources, DataSourceV2. That API allows 
you to produce UnsafeRow and we are very likely going to change that to 
InternalRow (SPARK-23325). There's an experimental version in the latest 2.3.0 
release if you'd like to try it out.

Here's an example implementation from the Iceberg table format: 
https://github.com/Netflix/iceberg/blob/master/spark/src/main/java/com/netflix/iceberg/spark/source/Reader.java

rb

On Thu, Mar 22, 2018 at 7:24 AM, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Because these are not exposed in the usual API, its not possible (or difficult) 
to create custom structured streaming sources.

Consequently, one has to create streaming sources in packages under 
org.apache.spark.sql.

Any pointers or info is greatly appreciated.



--
Ryan Blue
Software Engineer
Netflix


Any reason for not exposing internalCreateDataFrame or isStreaming beyond sql package?

2018-03-22 Thread Thakrar, Jayesh
Because these are not exposed in the usual API, its not possible (or difficult) 
to create custom structured streaming sources.

Consequently, one has to create streaming sources in packages under 
org.apache.spark.sql.

Any pointers or info is greatly appreciated.


Re: "Spark.jars not adding jars to classpath"

2018-03-22 Thread Thakrar, Jayesh
Is this in spark-shell or a spark-submit job?
If spark-submit job,  is it local or cluster?

One reliable way of adding jars is to use the command line option "--jars"
See 
http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management
 for more info.

If you add jars after the sparkcontext is created, its too late as the driver 
and executor processes (distributed) or threads (local) would have already been 
setup.


From: Ankit Agrahari 
Date: Tuesday, March 20, 2018 at 11:34 PM
To: 
Subject: "Spark.jars not adding jars to classpath"

I am trying to add my custom jar in spark job using "spark.jars" property.
Although I can read the info in logs of jar getting added but when I check the 
jars that are added to class path, I doesn't find it.Below are the functions 
that I also have tried it out.
1)spark.jars
2)spark.driver.extraLibraryPath
3)spark.executor.extraLibraryPath
4)setJars(Seq[])

But none added jar.I am using spark 2.2.0 in HDP and files were kept locally.
Please let me know what possibly I am doing wrong.


Cannot create custom streaming source in Spark 2.3.0

2018-03-19 Thread Thakrar, Jayesh
I am trying to create a custom streaming source in Spark 2.3.0 and getting the 
following error:

scala> 2018-03-19 17:43:20 ERROR MicroBatchExecution:91 - Query [id = 
48bb7a4c-7c66-4ad3-926b-81f8369a6efb, runId = 
50800f9b-434d-43df-8d6a-3e0fdc865aeb] terminated with error
java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from 
MyDataStreamSource@6c88c38b did not have isStreaming=true
LogicalRDD [string#5], false


A similar issue was experienced in azure-eventhubs-databricks_2.11:3.3 which 
seemed to have been fixed in azure-eventhubs-databricks_2.11:3.4
See - https://github.com/Azure/azure-event-hubs-spark/issues/213

Could this be related to https://issues.apache.org/jira/browse/SPARK-21765 ?

Any help/pointers greatly appreciated !!


If it matters, here's my trivial custom streaming source:


import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.sql.execution.streaming.{Offset, Source}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import sun.font.TrueTypeFont


class MyDataStreamProvider
  extends DataSourceRegister
  with StreamSourceProvider {

  val DEFAULT_NUM_PARTITIONS = "2"
  val DEFAULT_ROWS_PER_PARTITION = "5"

  private val myDataStreamSchema: StructType = new 
StructType(Array[StructField](new StructField("string", StringType, false)))

  override def shortName(): String = "mydatastream"

  override def sourceSchema(sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, 
StructType) = {
("MyDataStream", schema.getOrElse(myDataStreamSchema))
  }

  override def createSource(sqlContext: SQLContext,
metadataPath: String,
schema: Option[StructType],
providerName: String, parameters: Map[String, 
String]): Source = {
val numPartitions: Int = parameters.getOrElse("numpartitions", 
DEFAULT_NUM_PARTITIONS).toInt
val rowsPerPartition: Int = parameters.getOrElse("rowsperpartition", 
DEFAULT_ROWS_PER_PARTITION).toInt
new MyDataStreamSource(sqlContext, schema.getOrElse(myDataStreamSchema), 
numPartitions, rowsPerPartition)
  }

}

class MyDataStreamSource(sqlContext: SQLContext,
 override val schema: StructType,
 numPartitions: Int,
 numRowsPerPartition: Int)
  extends Source {

  override def getOffset: Option[Offset] = Some(new MyDataStreamOffset(offset = 
System.currentTimeMillis()))

  override def commit(end: Offset): Unit = {}

  override def stop: Unit = {}

  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val batchStreamTime = System.currentTimeMillis()
val rdd: RDD[Row] = new MyDataStreamRDD(sqlContext.sparkContext, 
batchStreamTime, numPartitions, numRowsPerPartition)
val df = sqlContext.createDataFrame(rdd, schema)
df
  }

}

class MyDataStreamOffset(offset: Long)
  extends Offset {

  def value: Long = offset

  override def json: String = s"""{"offset" : ${offset}}"""

}

class MyDataStreamRDD(_sc: SparkContext,
  batchStreamTime: Long,
  numPartitions: Int,
  rowsPerPartition: Int)
  extends RDD[Row](_sc, Nil) {
  override def getPartitions: Array[Partition] = {
val partitionSeq: Seq[Int] = 0 until numPartitions
val partitions = partitionSeq.map(partitionId => new 
MyDataStreamPartition(partitionId, batchStreamTime, rowsPerPartition))
partitions.toArray
 }

  override def compute(partition: Partition, context: TaskContext): 
Iterator[Row] = {
val myDataSourcePartition = partition.asInstanceOf[MyDataStreamPartition]
val partitionId = myDataSourcePartition.index
val rows = myDataSourcePartition.rowCount
val time = myDataSourcePartition.batchStreamTime
val partitionData = 1 to rows map(r => Row(s"Partition: ${partitionId} for 
time ${time}, row ${r} of ${rows}"))
partitionData.iterator
  }

}


class MyDataStreamPartition(partitionId: Int, time: Long, rows: Int)
  extends Partition
  with Serializable {

  override def index: Int = partitionId

  override def toString: String = s"Partition: ${partitionId}  Time: ${time}  
Rows: ${rows}"

  def batchStreamTime: Long = time

  def rowCount: Int = rows

}



Re: SparkContext - parameter for RDD, but not serializable, why?

2018-02-28 Thread Thakrar, Jayesh
Wenchen,

Thank you very much for your prompt reply and pointer!

As I think through, it makes sense that since my custom RDD is instantiated on 
the driver, get whatever things I need from the SparkContext and assign them to 
instance variables.

However the "RDD.SparkContext" and the Scala magic of class variables did not 
work.

Here's what worked based you on your tip:


class MyDataSourceRDD(sc: SparkContext, conf: Map[String, String]) extends 
RDD[Row](sc, Nil) {

  val sparkConf = sc.getConf

  override def getPartitions: Array[org.apache.spark.Partition] = {
sparkConf.getAll.foreach(println)
val numPartitions = conf.getOrElse("spark.mydata.numpartitions", "1").toInt
val rowsPerPartition = conf.getOrElse("spark.mydata.rowsperpartition", 
"3").toInt
val partitions = 0 until numPartitions map(partition => new 
MyDataSourcePartition(partition,rowsPerPartition))
partitions.toArray
  }

  override def compute(split: Partition, context: TaskContext): Iterator[Row] = 
{
val myDataSourcePartition = split.asInstanceOf[MyDataSourcePartition]
val partitionId = myDataSourcePartition.index
val rows = myDataSourcePartition.rowCount
val partitionData = 1 to rows map(r => Row(s"Partition: ${partitionId}, row 
${r} of ${rows}"))
partitionData.iterator
  }

}


From: Wenchen Fan <cloud0...@gmail.com>
Date: Wednesday, February 28, 2018 at 12:25 PM
To: "Thakrar, Jayesh" <jthak...@conversantmedia.com>
Cc: "dev@spark.apache.org" <dev@spark.apache.org>
Subject: Re: SparkContext - parameter for RDD, but not serializable, why?

My understanding:

RDD is also a driver side stuff like SparkContext, works like a handler to your 
distributed data on the cluster.

However, `RDD.compute` (defines how to produce data for each partition) needs 
to be executed on the remote nodes. It's more convenient to make RDD 
serializable, and transfer RDD object to remote nodes and call `compute`.

So you can use SparkContext inside RDD, as long as you don't use it in methods 
that are going to be executed on remote nodes, like `RDD.compute`. And you 
should call `RDD.sparkContext` instead of using `sc` directly, because that 
will turn `sc` from a constructor parameter to a class member variable(kind of 
a hacky part of Scala), which will be serialized.

On Thu, Mar 1, 2018 at 2:03 AM, Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>> wrote:
Hi All,

I was just toying with creating a very rudimentary RDD datasource to understand 
the inner workings of RDDs.

It seems that one of the constructors for RDD has a parameter of type 
SparkContext, but it (apparently) exists on the driver only and is not 
serializable.

Consequently, any attempt to use SparkContext parameter inside your custom RDD 
generates a runtime error of it not being serializable.

Just wondering what is the rationale behind this?
I.e. if it is not serializable/usable, why make it a parameter?
And if it needs to be a parameter, why not make it serializable (is it even 
possible?)

Below is my working code where I test a custom RDD.

scala> val mydata = spark.read.format("MyDataSourceProvider").load()
mydata: org.apache.spark.sql.DataFrame = [mydataStr: string]

scala> mydata.show(10, false)
++
|mydataStr   |
++
|Partition: 0, row 1 of 3|
|Partition: 0, row 2 of 3|
|Partition: 0, row 3 of 3|
++

scala>


/ custom RDD


import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD

class MyDataSourceProvider extends DataSourceRegister
  with RelationProvider with Logging {

  override def shortName():String = { "mydata" }

  private val myDataSchema: StructType = new StructType(Array[StructField](new 
StructField("mydataStr", StringType, false)))

  def sourceSchema(sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, 
StructType) = {
(shortName(), schema.get)
  }

  override def createRelation(sqlContext: SQLContext,
  parameters: Map[String, String]): BaseRelation = {
new MyDataRelation(sqlContext, myDataSchema, parameters)
  }

}


class MyDataRelation(override val sqlContext: SQLContext,
 override val schema: StructType,
 params: Map[String, String]) extends BaseRelation with 
TableScan with Logging {

  override def buildScan(): org.apache.spark.rdd.R

SparkContext - parameter for RDD, but not serializable, why?

2018-02-28 Thread Thakrar, Jayesh
Hi All,

I was just toying with creating a very rudimentary RDD datasource to understand 
the inner workings of RDDs.

It seems that one of the constructors for RDD has a parameter of type 
SparkContext, but it (apparently) exists on the driver only and is not 
serializable.

Consequently, any attempt to use SparkContext parameter inside your custom RDD 
generates a runtime error of it not being serializable.

Just wondering what is the rationale behind this?
I.e. if it is not serializable/usable, why make it a parameter?
And if it needs to be a parameter, why not make it serializable (is it even 
possible?)

Below is my working code where I test a custom RDD.

scala> val mydata = spark.read.format("MyDataSourceProvider").load()
mydata: org.apache.spark.sql.DataFrame = [mydataStr: string]

scala> mydata.show(10, false)
++
|mydataStr   |
++
|Partition: 0, row 1 of 3|
|Partition: 0, row 2 of 3|
|Partition: 0, row 3 of 3|
++

scala>


/ custom RDD


import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, 
RelationProvider, TableScan}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD


class MyDataSourceProvider extends DataSourceRegister
  with RelationProvider with Logging {

  override def shortName():String = { "mydata" }

  private val myDataSchema: StructType = new StructType(Array[StructField](new 
StructField("mydataStr", StringType, false)))

  def sourceSchema(sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, 
StructType) = {
(shortName(), schema.get)
  }

  override def createRelation(sqlContext: SQLContext,
  parameters: Map[String, String]): BaseRelation = {
new MyDataRelation(sqlContext, myDataSchema, parameters)
  }

}


class MyDataRelation(override val sqlContext: SQLContext,
 override val schema: StructType,
 params: Map[String, String]) extends BaseRelation with 
TableScan with Logging {

  override def buildScan(): org.apache.spark.rdd.RDD[Row] = {
val rdd = new MyDataSourceRDD(sqlContext.sparkContext, 
sqlContext.getAllConfs)
rdd
  }

  override def needConversion = true
}


class MyDataSourceRDD(sc: SparkContext, conf: Map[String, String]) extends 
RDD[Row](sc, Nil) {

  override def getPartitions: Array[org.apache.spark.Partition] = {
// sc.getConf.getAll.foreach(println) - this fails with SparkContext not 
serialiable error. So what use is this parameter ?!
val numPartitions = conf.getOrElse("spark.mydata.numpartitions", "1").toInt
val rowsPerPartition = conf.getOrElse("spark.mydata.rowsperpartition", 
"3").toInt
val partitions = 0 until numPartitions map(partition => new 
MyDataSourcePartition(partition,rowsPerPartition))
partitions.toArray
  }

  override def compute(split: Partition, context: TaskContext): Iterator[Row] = 
{
val myDataSourcePartition = split.asInstanceOf[MyDataSourcePartition]
val partitionId = myDataSourcePartition.index
val rows = myDataSourcePartition.rowCount
val partitionData = 1 to rows map(r => Row(s"Partition: ${partitionId}, row 
${r} of ${rows}"))
partitionData.iterator
  }

}


class MyDataSourcePartition(partitionId: Int, rows: Int) extends Partition with 
Serializable {

  override def index:Int = partitionId

  def rowCount: Int = rows



Spark Streaming Custom Receiver Anomaly

2018-02-20 Thread Thakrar, Jayesh
Hi All,

I am trying to "test" a very simple custom receiver and am a little puzzled.

Using Spark 2.2.0 shell on my laptop, I am running the code below.
I was expecting the code to timeout since my timeout wait period is 1 ms and I 
have a sleep in the class that is much more (1200 ms).

Is this normal? Or am I interpreting something incorrectly?

import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming._

class CustomReceiver extends 
org.apache.spark.streaming.receiver.Receiver[String](org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
 {
  def onStart() {
new Thread("CustomReceiver") {
  override def run() { receive() }
}.start()
  }
  def onStop() {}
  private def receive() {
val hostname = java.net.InetAddress.getLocalHost()
val time = java.util.Calendar.getInstance.getTime
var counter = 0
while (isStarted && !isStopped) {
  counter += 1
  store(s"host = ${hostname} time = ${time} counter = ${counter}")
  Thread.sleep(1200)
}
  }
}

val ssc = new StreamingContext(sc, Seconds(1))
val words = ssc.receiverStream(new CustomReceiver())

words.print()
ssc.start()
ssc.awaitTerminationOrTimeout(1)




Re: Expand the Spark SQL programming guide?

2016-12-16 Thread Thakrar, Jayesh
Yes - that sounds good Anton, I can work on documenting the window functions.

From: Anton Okolnychyi <anton.okolnyc...@gmail.com>
Date: Thursday, December 15, 2016 at 4:34 PM
To: Conversant <jthak...@conversantmedia.com>
Cc: Michael Armbrust <mich...@databricks.com>, Jim Hughes <jn...@ccri.com>, 
"dev@spark.apache.org" <dev@spark.apache.org>
Subject: Re: Expand the Spark SQL programming guide?

I think it will make sense to show a sample implementation of 
UserDefinedAggregateFunction for DataFrames, and an example of the Aggregator 
API for typed Datasets.

Jim, what if I submit a PR and you join the review process? I also do not mind 
to split this if you want, but it seems to be an overkill for this part.

Jayesh, shall I skip the window functions part since you are going to work on 
that?

2016-12-15 22:48 GMT+01:00 Thakrar, Jayesh 
<jthak...@conversantmedia.com<mailto:jthak...@conversantmedia.com>>:
I too am interested in expanding the documentation for Spark SQL.
For my work I needed to get some info/examples/guidance on window functions and 
have been using 
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
 .
How about divide and conquer?


From: Michael Armbrust <mich...@databricks.com<mailto:mich...@databricks.com>>
Date: Thursday, December 15, 2016 at 3:21 PM
To: Jim Hughes <jn...@ccri.com<mailto:jn...@ccri.com>>
Cc: "dev@spark.apache.org<mailto:dev@spark.apache.org>" 
<dev@spark.apache.org<mailto:dev@spark.apache.org>>
Subject: Re: Expand the Spark SQL programming guide?

Pull requests would be welcome for any major missing features in the guide: 
https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md

On Thu, Dec 15, 2016 at 11:48 AM, Jim Hughes 
<jn...@ccri.com<mailto:jn...@ccri.com>> wrote:
Hi Anton,

I'd like to see this as well.  I've been working on implementing geospatial 
user-defined types and functions.  Having examples of aggregations and window 
functions would be awesome!

I did test out implementing a distributed convex hull as a 
UserDefinedAggregateFunction, and that seemed to work sensibly.

Cheers,

Jim

On 12/15/2016 03:28 AM, Anton Okolnychyi wrote:
Hi,

I am wondering whether it makes sense to expand the Spark SQL programming guide 
with examples of aggregations (including user-defined via the Aggregator API) 
and window functions.  For instance, there might be a separate subsection under 
"Getting Started" for each functionality.

SPARK-16046 seems to be related but there is no activity for more than 4 months.

Best regards,
Anton






Re: Can I add a new method to RDD class?

2016-12-05 Thread Thakrar, Jayesh
Teng,

Before you go down creating your own custom Spark system, do give some thought 
to what Holden and others are suggesting, viz. using implicit methods.

If you want real concrete examples, have a look at the Spark Cassandra 
Connector -

Here you will see an example of "extending" SparkContext - 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md

// validation is deferred, so it is not triggered during rdd creation
val rdd = sc.cassandraTable[SomeType]("ks", "not_existing_table")
val emptyRDD = rdd.toEmptyCassandraRDD

val emptyRDD2 = sc.emptyCassandraTable[SomeType]("ks", "not_existing_table"))


And here you will se an example of "extending" RDD - 
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

case class WordCount(word: String, count: Long)
val collection = sc.parallelize(Seq(WordCount("dog", 50), WordCount("cow", 60)))
collection.saveToCassandra("test", "words", SomeColumns("word", "count"))

Hope that helps…
Jayesh


From: Teng Long 
Date: Monday, December 5, 2016 at 3:04 PM
To: Holden Karau , 
Subject: Re: Can I add a new method to RDD class?

Thank you for providing another answer, Holden.

So I did what Tarun and Michal suggested, and it didn’t work out as I want to 
have a new transformation method in RDD class, and need to use that RDD’s spark 
context which is private. So I guess the only thing I can do now is to sbt 
publishLocal?

On Dec 5, 2016, at 9:19 AM, Holden Karau 
> wrote:

Doing that requires publishing a custom version of Spark, you can edit the 
version number do do a publishLocal - but maintaining that change is going to 
be difficult. The other approaches suggested are probably better, but also does 
your method need to be defined on the RDD class? Could you instead make a 
helper object or class to expose whatever functionality you need?

On Mon, Dec 5, 2016 at 6:06 PM long 
> wrote:
Thank you very much! But why can’t I just add new methods in to the source code 
of RDD?

On Dec 5, 2016, at 3:15 AM, Michal Šenkýř [via Apache Spark Developers List] 
<[hidden email]> wrote:


A simple Scala example of implicit classes:

implicit class EnhancedString(str: String) {

  def prefix(prefix: String) = prefix + str

}



println("World".prefix("Hello "))

As Tarun said, you have to import it if it's not in the same class where you 
use it.

Hope this makes it clearer,

Michal Senkyr

On 5.12.2016 07:43, Tarun Kumar wrote:
Not sure if that's documented in terms of Spark but this is a fairly common 
pattern in scala known as "pimp my library" pattern, you can easily find many 
generic example of using this pattern. If you want I can quickly cook up a 
short conplete example with rdd(although there is nothing really more to my 
example in earlier mail) ? Thanks Tarun Kumar

On Mon, 5 Dec 2016 at 7:15 AM, long <[hidden email]> wrote:
So is there documentation of this I can refer to?

On Dec 5, 2016, at 1:07 AM, Tarun Kumar [via Apache Spark Developers List] 
<[hidden email]> wrote:

Hi Tenglong, In addition to trsell's reply, you can add any method to an rdd 
without making changes to spark code. This can be achieved by using implicit 
class in your own client code: implicit class extendRDD[T](rdd: RDD[T]){ def 
foo() } Then you basically nees to import this implicit class in scope where 
you want to use the new foo method. Thanks Tarun Kumar

On Mon, 5 Dec 2016 at 6:59 AM, [hidden email]> wrote:

How does your application fetch the spark dependency? Perhaps list your project 
dependencies and check it's using your dev build.

On Mon, 5 Dec 2016, 08:47 tenglong, [hidden email]> wrote:
Hi,

Apparently, I've already tried adding a new method to RDD,

for example,

class RDD {
  def foo() // this is the one I added

  def map()

  def collect()
}

I can build Spark successfully, but I can't compile my application code
which calls rdd.foo(), and the error message says

value foo is not a member of org.apache.spark.rdd.RDD[String]

So I am wondering if there is any mechanism prevents me from doing this or
something I'm doing wrong?




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Can-I-add-a-new-method-to-RDD-class-tp20100.html
Sent from the Apache Spark Developers List mailing list archive at 
Nabble.com.

-
To unsubscribe e-mail: x-msg://19/user/SendEmail.jtp?type=nodenode=20102i=2" 
target="_top" rel="nofollow"