Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

2018-02-05 Thread Wenchen Fan
I think many advanced Spark users already have customer catalyst rules, to
deal with the query plan directly, so it makes a lot of sense to
standardize the logical plan. However, instead of exploring possible
operations ourselves, I think we should follow the SQL standard.

ReplaceTable, RTAS:
Most of the mainstream databases don't support these 2. I think
drop-all-data operation is dangerous and we should only allow users to do
it with DROP TABLE.

DeleteFrom, ReplaceData:
These 2 are SQL standard, but in a more general form. DELETE, UPDATE, MERGE
are the most common SQL statements to change the data.

There is something more we need to take care, like ALTER TABLE. I'm looking
forward to a holistic SPIP about this, thanks for your contribution!

Wenchen



On Tue, Feb 6, 2018 at 8:32 AM, Ryan Blue  wrote:

> Thanks for responding!
>
> I’ve been coming up with a list of the high-level operations that are
> needed. I think all of them come down to 5 questions about what’s happening:
>
>- Does the target table exist?
>- If it does exist, should it be dropped?
>- If not, should it get created?
>- Should data get written to the table?
>- Should data get deleted from the table?
>
> Using those, you can list out all the potential operations. Here’s a flow
> chart that makes it easier to think about:
>
> Table exists?  NoYes
> | |
> Drop table?N/AYes 
> <---+--> No
> |  |  
>   |
> Create table?Yes <--+--> No  Yes <-+-> No 
> Exists
>   |  Noop |DropTable  
>   |
> Write data? Yes <-+-> NoYes <-+-> No Yes 
> <--+-> No
> CTAS  CreateTable   RTAS  ReplaceTable|   
>   |
> Delete data?  Yes <---+---> 
> No   Yes <--+--> No
>   ReplaceData   
> InsertInto   DeleteFrom  Noop
>
> Some of these can be broken down into other operations (replace table =
> drop & create), but I think it is valuable to consider each one and think
> about whether it should be atomic. CTAS is a create and an insert that
> guarantees the table exists only if the insert succeeded. Should we also
> support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar
> guarantee that the original table will be dropped if and only if the write
> succeeds?
>
> As a sanity check, most of these operations correspond to SQL statements
> for tables
>
>- CreateTable = CREATE TABLE t
>- DropTable = DROP TABLE t
>- ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?)
>- CTAS = CREATE TABLE t AS SELECT ...
>- RTAS = ??? (we could add REPLACE TABLE t AS ...)
>
> Or for data:
>
>- DeleteFrom = DELETE FROM t WHERE ...
>- InsertInto = INSERT INTO t SELECT ...
>- ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN;
>DELETE FROM t; INSERT INTO t SELECT ...; COMMIT;
>
> The last one, ReplaceData, is interesting because only one specific case
> is currently supported and requires partitioning.
>
> I think we need to consider all of these operations while building
> DataSourceV2. We still need to define what v2 sources should do.
>
> Also, I would like to see a way to provide weak guarantees easily and
> another way for v2 sources to implement stronger guarantees. For example,
> CTAS can be implemented as a create, then an insert, with a drop if the
> insert fails. That covers most cases and is easy to implement. But some
> table formats can provide stronger guarantees. Iceberg supports atomic
> create-and-insert, so that a table ever exists unless its write succeeds,
> and it’s not just rolled back if the driver is still alive after a failure.
> If we implement the basics (create, insert, drop-on-failure) in Spark, I
> think we will end up with more data sources that have reliable behavior.
>
> Would anyone be interested in an improvement proposal for this? It would
> be great to document this and build consensus around Spark’s expected
> behavior. I can write it up.
>
> rb
> ​
>
> On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrust 
> wrote:
>
>> So here are my recommendations for moving forward, with DataSourceV2 as a
>>> starting point:
>>>
>>>1. Use well-defined logical plan nodes for all high-level
>>>operations: insert, create, CTAS, overwrite table, etc.
>>>2. Use rules that match on these high-level plan nodes, so that it
>>>isn’t necessary to create rules to match each eventual code path
>>>individually
>>>3. Define Spark’s behavior for these 

Re: SQL logical plans and DataSourceV2 (was: data source v2 online meetup)

2018-02-05 Thread Ryan Blue
Thanks for responding!

I’ve been coming up with a list of the high-level operations that are
needed. I think all of them come down to 5 questions about what’s happening:

   - Does the target table exist?
   - If it does exist, should it be dropped?
   - If not, should it get created?
   - Should data get written to the table?
   - Should data get deleted from the table?

Using those, you can list out all the potential operations. Here’s a flow
chart that makes it easier to think about:

Table exists?  NoYes
| |
Drop table?N/AYes
<---+--> No
|  |
 |
Create table?Yes <--+--> No  Yes <-+-> No
   Exists
  |  Noop |DropTable
 |
Write data? Yes <-+-> NoYes <-+-> No
Yes <--+-> No
CTAS  CreateTable   RTAS  ReplaceTable
| |
Delete data?  Yes
<---+---> No   Yes <--+--> No

ReplaceData   InsertInto   DeleteFrom  Noop

Some of these can be broken down into other operations (replace table =
drop & create), but I think it is valuable to consider each one and think
about whether it should be atomic. CTAS is a create and an insert that
guarantees the table exists only if the insert succeeded. Should we also
support RTAS=ReplaceTableAsSelect (drop, create, insert) and make a similar
guarantee that the original table will be dropped if and only if the write
succeeds?

As a sanity check, most of these operations correspond to SQL statements
for tables

   - CreateTable = CREATE TABLE t
   - DropTable = DROP TABLE t
   - ReplaceTable = DROP TABLE t; CREATE TABLE t (no transaction needed?)
   - CTAS = CREATE TABLE t AS SELECT ...
   - RTAS = ??? (we could add REPLACE TABLE t AS ...)

Or for data:

   - DeleteFrom = DELETE FROM t WHERE ...
   - InsertInto = INSERT INTO t SELECT ...
   - ReplaceData = INSERT OVERWRITE t PARTITION (p) SELECT ... or BEGIN;
   DELETE FROM t; INSERT INTO t SELECT ...; COMMIT;

The last one, ReplaceData, is interesting because only one specific case is
currently supported and requires partitioning.

I think we need to consider all of these operations while building
DataSourceV2. We still need to define what v2 sources should do.

Also, I would like to see a way to provide weak guarantees easily and
another way for v2 sources to implement stronger guarantees. For example,
CTAS can be implemented as a create, then an insert, with a drop if the
insert fails. That covers most cases and is easy to implement. But some
table formats can provide stronger guarantees. Iceberg supports atomic
create-and-insert, so that a table ever exists unless its write succeeds,
and it’s not just rolled back if the driver is still alive after a failure.
If we implement the basics (create, insert, drop-on-failure) in Spark, I
think we will end up with more data sources that have reliable behavior.

Would anyone be interested in an improvement proposal for this? It would be
great to document this and build consensus around Spark’s expected
behavior. I can write it up.

rb
​

On Fri, Feb 2, 2018 at 3:23 PM, Michael Armbrust 
wrote:

> So here are my recommendations for moving forward, with DataSourceV2 as a
>> starting point:
>>
>>1. Use well-defined logical plan nodes for all high-level operations:
>>insert, create, CTAS, overwrite table, etc.
>>2. Use rules that match on these high-level plan nodes, so that it
>>isn’t necessary to create rules to match each eventual code path
>>individually
>>3. Define Spark’s behavior for these logical plan nodes. Physical
>>nodes should implement that behavior, but all CREATE TABLE OVERWRITE 
>> should
>>(eventually) make the same guarantees.
>>4. Specialize implementation when creating a physical plan, not
>>logical plans.
>>
>> I realize this is really long, but I’d like to hear thoughts about this.
>> I’m sure I’ve left out some additional context, but I think the main idea
>> here is solid: lets standardize logical plans for more consistent behavior
>> and easier maintenance.
>>
> Context aside, I really like these rules! I think having query planning be
> the boundary for specialization makes a lot of sense.
>
> (RunnableCommand might also be my fault though sorry! :P)
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: Spark on Kubernetes Builder Pattern Design Document

2018-02-05 Thread Mark Hamstra
Sure. Obviously, there is going to be some overlap as the project
transitions to being part of mainline Spark development. As long as you are
consciously working toward moving discussions into this dev list, then all
is good.

On Mon, Feb 5, 2018 at 1:56 PM, Matt Cheah  wrote:

> I think in this case, the original design that was proposed before the
> document was implemented on the Spark on K8s fork, that we took some time
> to build separately before proposing that the fork be merged into the main
> line.
>
>
>
> Specifically, the timeline of events was:
>
>
>
>1. We started building Spark on Kubernetes on a fork and was prepared
>to merge our work directly into master,
>2. Discussion on https://issues.apache.org/jira/browse/SPARK-18278 led
>us to move down the path of working on a fork first. We would harden the
>fork, have the fork become used more widely to prove its value and
>robustness in practice. See https://github.com/apache-
>spark-on-k8s/spark
>3. On said fork, we made the original design decisions to use a
>step-based builder pattern for the driver but not the same design for the
>executors. This original discussion was made among the collaborators of the
>fork, as much of the work on the fork in general was not done on the
>mailing list.
>4. We eventually decided to merge the fork into the main line, and got
>the feedback in the corresponding PRs.
>
>
>
> Therefore the question may less so be with this specific design, but
> whether or not the overarching approach we took - building Spark on K8s on
> a fork first before merging into mainline – was the correct one in the
> first place. There’s also the issue that the work done on the fork was
> isolated from the dev mailing list. Moving forward as we push our work into
> mainline Spark, we aim to be transparent with the Spark community via the
> Spark mailing list and Spark JIRA tickets. We’re specifically aiming to
> deprecate the fork and migrate all the work done on the fork into the main
> line.
>
>
>
> -Matt Cheah
>
>
>
> *From: *Mark Hamstra 
> *Date: *Monday, February 5, 2018 at 1:44 PM
> *To: *Matt Cheah 
> *Cc: *"dev@spark.apache.org" , "
> ramanath...@google.com" , Ilan Filonenko <
> i...@cornell.edu>, Erik , Marcelo Vanzin <
> van...@cloudera.com>
> *Subject: *Re: Spark on Kubernetes Builder Pattern Design Document
>
>
>
> That's good, but you should probably stop and consider whether the
> discussions that led up to this document's creation could have taken place
> on this dev list -- because if they could have, then they probably should
> have as part of the whole spark-on-k8s project becoming part of mainline
> spark development, not a separate fork.
>
>
>
> On Mon, Feb 5, 2018 at 1:17 PM, Matt Cheah  wrote:
>
> Hi everyone,
>
>
>
> While we were building the Spark on Kubernetes integration, we realized
> that some of the abstractions we introduced for building the driver
> application in spark-submit, and building executor pods in the scheduler
> backend, could be improved for better readability and clarity. We received
> feedback in this pull request[github.com]
> 
> in particular. In response to this feedback, we’ve put together a design
> document that proposes a possible refactor to address the given feedback.
>
>
>
> You may comment on the proposed design at this link:
> https://docs.google.com/document/d/1XPLh3E2JJ7yeJSDLZWXh_
> lUcjZ1P0dy9QeUEyxIlfak/edit#[docs.google.com]
> 
>
>
>
> I hope that we can have a productive discussion and continue improving the
> Kubernetes integration further.
>
>
>
> Thanks,
>
>
>
> -Matt Cheah
>
>
>


Re: Spark on Kubernetes Builder Pattern Design Document

2018-02-05 Thread Matt Cheah
I think in this case, the original design that was proposed before the document 
was implemented on the Spark on K8s fork, that we took some time to build 
separately before proposing that the fork be merged into the main line.

 

Specifically, the timeline of events was:

 
We started building Spark on Kubernetes on a fork and was prepared to merge our 
work directly into master,
Discussion on https://issues.apache.org/jira/browse/SPARK-18278 led us to move 
down the path of working on a fork first. We would harden the fork, have the 
fork become used more widely to prove its value and robustness in practice. See 
https://github.com/apache-spark-on-k8s/spark
On said fork, we made the original design decisions to use a step-based builder 
pattern for the driver but not the same design for the executors. This original 
discussion was made among the collaborators of the fork, as much of the work on 
the fork in general was not done on the mailing list.
We eventually decided to merge the fork into the main line, and got the 
feedback in the corresponding PRs.
 

Therefore the question may less so be with this specific design, but whether or 
not the overarching approach we took - building Spark on K8s on a fork first 
before merging into mainline – was the correct one in the first place. There’s 
also the issue that the work done on the fork was isolated from the dev mailing 
list. Moving forward as we push our work into mainline Spark, we aim to be 
transparent with the Spark community via the Spark mailing list and Spark JIRA 
tickets. We’re specifically aiming to deprecate the fork and migrate all the 
work done on the fork into the main line.

 

-Matt Cheah

 

From: Mark Hamstra 
Date: Monday, February 5, 2018 at 1:44 PM
To: Matt Cheah 
Cc: "dev@spark.apache.org" , "ramanath...@google.com" 
, Ilan Filonenko , Erik 
, Marcelo Vanzin 
Subject: Re: Spark on Kubernetes Builder Pattern Design Document

 

That's good, but you should probably stop and consider whether the discussions 
that led up to this document's creation could have taken place on this dev list 
-- because if they could have, then they probably should have as part of the 
whole spark-on-k8s project becoming part of mainline spark development, not a 
separate fork. 

 

On Mon, Feb 5, 2018 at 1:17 PM, Matt Cheah  wrote:

Hi everyone,

 

While we were building the Spark on Kubernetes integration, we realized that 
some of the abstractions we introduced for building the driver application in 
spark-submit, and building executor pods in the scheduler backend, could be 
improved for better readability and clarity. We received feedback in this pull 
request[github.com] in particular. In response to this feedback, we’ve put 
together a design document that proposes a possible refactor to address the 
given feedback.

 

You may comment on the proposed design at this link: 
https://docs.google.com/document/d/1XPLh3E2JJ7yeJSDLZWXh_lUcjZ1P0dy9QeUEyxIlfak/edit#[docs.google.com]

 

I hope that we can have a productive discussion and continue improving the 
Kubernetes integration further.

 

Thanks,

 

-Matt Cheah

 



smime.p7s
Description: S/MIME cryptographic signature


Re: Spark on Kubernetes Builder Pattern Design Document

2018-02-05 Thread Mark Hamstra
That's good, but you should probably stop and consider whether the
discussions that led up to this document's creation could have taken place
on this dev list -- because if they could have, then they probably should
have as part of the whole spark-on-k8s project becoming part of mainline
spark development, not a separate fork.

On Mon, Feb 5, 2018 at 1:17 PM, Matt Cheah  wrote:

> Hi everyone,
>
>
>
> While we were building the Spark on Kubernetes integration, we realized
> that some of the abstractions we introduced for building the driver
> application in spark-submit, and building executor pods in the scheduler
> backend, could be improved for better readability and clarity. We received
> feedback in this pull request 
> in particular. In response to this feedback, we’ve put together a design
> document that proposes a possible refactor to address the given feedback.
>
>
>
> You may comment on the proposed design at this link:
> https://docs.google.com/document/d/1XPLh3E2JJ7yeJSDLZWXh_
> lUcjZ1P0dy9QeUEyxIlfak/edit#
>
>
>
> I hope that we can have a productive discussion and continue improving the
> Kubernetes integration further.
>
>
>
> Thanks,
>
>
>
> -Matt Cheah
>


Spark on Kubernetes Builder Pattern Design Document

2018-02-05 Thread Matt Cheah
Hi everyone,

 

While we were building the Spark on Kubernetes integration, we realized that 
some of the abstractions we introduced for building the driver application in 
spark-submit, and building executor pods in the scheduler backend, could be 
improved for better readability and clarity. We received feedback in this pull 
request in particular. In response to this feedback, we’ve put together a 
design document that proposes a possible refactor to address the given feedback.

 

You may comment on the proposed design at this link: 
https://docs.google.com/document/d/1XPLh3E2JJ7yeJSDLZWXh_lUcjZ1P0dy9QeUEyxIlfak/edit#

 

I hope that we can have a productive discussion and continue improving the 
Kubernetes integration further.

 

Thanks,

 

-Matt Cheah



smime.p7s
Description: S/MIME cryptographic signature


Re: Corrupt parquet file

2018-02-05 Thread Ryan Blue
In that case, I'd recommend tracking down the node where the files were
created and reporting it to EMR.

On Mon, Feb 5, 2018 at 10:38 AM, Dong Jiang  wrote:

> Thanks for the response, Ryan.
>
> We have transient EMR cluster, and we do rerun the cluster whenever the
> cluster failed. However, in this particular case, the cluster succeeded,
> not reporting any errors. I was able to null out the corrupted the column
> and recover the rest of the 133 columns. I do feel the issue is more than
> 1-2 occurrences a year. This is the second time, I am aware of the issue
> within a month, and we certainly don’t run as large data infrastructure
> compared to Netflix.
>
>
>
> I will keep an eye on this issue.
>
>
>
> Thanks,
>
>
> Dong
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Monday, February 5, 2018 at 1:34 PM
>
> *To: *Dong Jiang 
> *Cc: *Spark Dev List 
> *Subject: *Re: Corrupt parquet file
>
>
>
> We ensure the bad node is removed from our cluster and reprocess to
> replace the data. We only see this once or twice a year, so it isn't a
> significant problem.
>
>
>
> We've discussed options for adding write-side validation, but it is
> expensive and still unreliable if you don't trust the hardware.
>
>
>
> rb
>
>
>
> On Mon, Feb 5, 2018 at 10:28 AM, Dong Jiang  wrote:
>
> Hi, Ryan,
>
>
> Do you have any suggestions on how we could detect and prevent this issue?
>
> This is the second time we encountered this issue. We have a wide table,
> with 134 columns in the file. The issue seems only impact one column, and
> very hard to detect. It seems you have encountered this issue before, what
> do you do to prevent a recurrence?
>
>
>
> Thanks,
>
>
>
> Dong
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Monday, February 5, 2018 at 12:46 PM
>
>
> *To: *Dong Jiang 
> *Cc: *Spark Dev List 
> *Subject: *Re: Corrupt parquet file
>
>
>
> If you can still access the logs, then you should be able to find where
> the write task ran. Maybe you can get an instance ID and open a ticket with
> Amazon. Otherwise, it will probably start failing the HW checks when the
> instance hardware is reused, so I wouldn't worry about it.
>
>
>
> The _SUCCESS file convention means that the job ran successfully, at least
> to the point where _SUCCESS is created. I wouldn't rely on _SUCCESS to
> indicate actual job success (you could do other tasks after that fail) and
> it carries no guarantee about the data that was written.
>
>
>
> rb
>
>
>
> On Mon, Feb 5, 2018 at 9:41 AM, Dong Jiang  wrote:
>
> Hi, Ryan,
>
>
>
> Many thanks for your quick response.
>
> We ran Spark on transient EMR clusters. Nothing in the log or EMR events
> suggests any issues with the cluster or the nodes. We also see the _SUCCESS
> file on the S3. If we see the _SUCCESS file, does that suggest all data is
> good?
>
> How can we prevent a recurrence? Can you share your experience?
>
>
>
> Thanks,
>
>
> Dong
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Monday, February 5, 2018 at 12:38 PM
> *To: *Dong Jiang 
> *Cc: *Spark Dev List 
> *Subject: *Re: Corrupt parquet file
>
>
>
> Dong,
>
>
>
> We see this from time to time as well. In my experience, it is almost
> always caused by a bad node. You should try to find out where the file was
> written and remove that node as soon as possible.
>
>
>
> As far as finding out what is wrong with the file, that's a difficult
> task. Parquet's encoding is very dense and corruption in encoded values
> often looks like different data. When you see a decoding exception like
> this, we find it is usually that the compressed data was corrupted and is
> no longer valid. You can look for the page of data based on the value
> counter, but that's about it.
>
>
>
> Even if you could find a single record that was affected, that's not
> valuable because you don't know whether there is other corruption that is
> undetectable. There's nothing to reliably recover here. What we do in this
> case is find and remove the bad node, then reprocess data so we know
> everything is correct from the upstream source.
>
>
>
> rb
>
>
>
> On Mon, Feb 5, 2018 at 9:01 AM, Dong Jiang  wrote:
>
> Hi,
>
> We are running on Spark 2.2.1, generating parquet files, like the following
> pseudo code
> df.write.parquet(...)
> We have recently noticed parquet file corruptions, when reading the parquet
> in Spark or Presto, as the following:
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 40870 in block 0 in file
> file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-
> 4af35426f434.c000.snappy.parquet
>
> Caused by: 

Re: Corrupt parquet file

2018-02-05 Thread Dong Jiang
Hi, Ryan,

Do you have any suggestions on how we could detect and prevent this issue?
This is the second time we encountered this issue. We have a wide table, with 
134 columns in the file. The issue seems only impact one column, and very hard 
to detect. It seems you have encountered this issue before, what do you do to 
prevent a recurrence?

Thanks,

Dong

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Monday, February 5, 2018 at 12:46 PM
To: Dong Jiang 
Cc: Spark Dev List 
Subject: Re: Corrupt parquet file

If you can still access the logs, then you should be able to find where the 
write task ran. Maybe you can get an instance ID and open a ticket with Amazon. 
Otherwise, it will probably start failing the HW checks when the instance 
hardware is reused, so I wouldn't worry about it.

The _SUCCESS file convention means that the job ran successfully, at least to 
the point where _SUCCESS is created. I wouldn't rely on _SUCCESS to indicate 
actual job success (you could do other tasks after that fail) and it carries no 
guarantee about the data that was written.

rb

On Mon, Feb 5, 2018 at 9:41 AM, Dong Jiang 
> wrote:
Hi, Ryan,

Many thanks for your quick response.
We ran Spark on transient EMR clusters. Nothing in the log or EMR events 
suggests any issues with the cluster or the nodes. We also see the _SUCCESS 
file on the S3. If we see the _SUCCESS file, does that suggest all data is good?
How can we prevent a recurrence? Can you share your experience?

Thanks,

Dong

From: Ryan Blue >
Reply-To: "rb...@netflix.com" 
>
Date: Monday, February 5, 2018 at 12:38 PM
To: Dong Jiang >
Cc: Spark Dev List >
Subject: Re: Corrupt parquet file

Dong,

We see this from time to time as well. In my experience, it is almost always 
caused by a bad node. You should try to find out where the file was written and 
remove that node as soon as possible.

As far as finding out what is wrong with the file, that's a difficult task. 
Parquet's encoding is very dense and corruption in encoded values often looks 
like different data. When you see a decoding exception like this, we find it is 
usually that the compressed data was corrupted and is no longer valid. You can 
look for the page of data based on the value counter, but that's about it.

Even if you could find a single record that was affected, that's not valuable 
because you don't know whether there is other corruption that is undetectable. 
There's nothing to reliably recover here. What we do in this case is find and 
remove the bad node, then reprocess data so we know everything is correct from 
the upstream source.

rb

On Mon, Feb 5, 2018 at 9:01 AM, Dong Jiang 
> wrote:
Hi,

We are running on Spark 2.2.1, generating parquet files, like the following
pseudo code
df.write.parquet(...)
We have recently noticed parquet file corruptions, when reading the parquet
in Spark or Presto, as the following:

Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
Can not read
value at 40870 in block 0 in file
file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-4af35426f434.c000.snappy.parquet

Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
could not read
page Page [bytes.size=1048594, valueCount=43663, uncompressedSize=1048594]
in col [incoming_aliases_array, list, element, key_value, value] BINARY

It appears only one column in one of the rows in the file is corrupt, the
file has 111041 rows.

My questions are
1) How can I identify the corrupted row?
2) What could cause the corruption? Spark issue or Parquet issue?

Any help is greatly appreciated.

Thanks,

Dong



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org



--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix


Re: Corrupt parquet file

2018-02-05 Thread Dong Jiang
Thanks for the response, Ryan.
We have transient EMR cluster, and we do rerun the cluster whenever the cluster 
failed. However, in this particular case, the cluster succeeded, not reporting 
any errors. I was able to null out the corrupted the column and recover the 
rest of the 133 columns. I do feel the issue is more than 1-2 occurrences a 
year. This is the second time, I am aware of the issue within a month, and we 
certainly don’t run as large data infrastructure compared to Netflix.

I will keep an eye on this issue.

Thanks,

Dong

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Monday, February 5, 2018 at 1:34 PM
To: Dong Jiang 
Cc: Spark Dev List 
Subject: Re: Corrupt parquet file

We ensure the bad node is removed from our cluster and reprocess to replace the 
data. We only see this once or twice a year, so it isn't a significant problem.

We've discussed options for adding write-side validation, but it is expensive 
and still unreliable if you don't trust the hardware.

rb

On Mon, Feb 5, 2018 at 10:28 AM, Dong Jiang 
> wrote:
Hi, Ryan,

Do you have any suggestions on how we could detect and prevent this issue?
This is the second time we encountered this issue. We have a wide table, with 
134 columns in the file. The issue seems only impact one column, and very hard 
to detect. It seems you have encountered this issue before, what do you do to 
prevent a recurrence?

Thanks,

Dong

From: Ryan Blue >
Reply-To: "rb...@netflix.com" 
>
Date: Monday, February 5, 2018 at 12:46 PM

To: Dong Jiang >
Cc: Spark Dev List >
Subject: Re: Corrupt parquet file

If you can still access the logs, then you should be able to find where the 
write task ran. Maybe you can get an instance ID and open a ticket with Amazon. 
Otherwise, it will probably start failing the HW checks when the instance 
hardware is reused, so I wouldn't worry about it.

The _SUCCESS file convention means that the job ran successfully, at least to 
the point where _SUCCESS is created. I wouldn't rely on _SUCCESS to indicate 
actual job success (you could do other tasks after that fail) and it carries no 
guarantee about the data that was written.

rb

On Mon, Feb 5, 2018 at 9:41 AM, Dong Jiang 
> wrote:
Hi, Ryan,

Many thanks for your quick response.
We ran Spark on transient EMR clusters. Nothing in the log or EMR events 
suggests any issues with the cluster or the nodes. We also see the _SUCCESS 
file on the S3. If we see the _SUCCESS file, does that suggest all data is good?
How can we prevent a recurrence? Can you share your experience?

Thanks,

Dong

From: Ryan Blue >
Reply-To: "rb...@netflix.com" 
>
Date: Monday, February 5, 2018 at 12:38 PM
To: Dong Jiang >
Cc: Spark Dev List >
Subject: Re: Corrupt parquet file

Dong,

We see this from time to time as well. In my experience, it is almost always 
caused by a bad node. You should try to find out where the file was written and 
remove that node as soon as possible.

As far as finding out what is wrong with the file, that's a difficult task. 
Parquet's encoding is very dense and corruption in encoded values often looks 
like different data. When you see a decoding exception like this, we find it is 
usually that the compressed data was corrupted and is no longer valid. You can 
look for the page of data based on the value counter, but that's about it.

Even if you could find a single record that was affected, that's not valuable 
because you don't know whether there is other corruption that is undetectable. 
There's nothing to reliably recover here. What we do in this case is find and 
remove the bad node, then reprocess data so we know everything is correct from 
the upstream source.

rb

On Mon, Feb 5, 2018 at 9:01 AM, Dong Jiang 
> wrote:
Hi,

We are running on Spark 2.2.1, generating parquet files, like the following
pseudo code
df.write.parquet(...)
We have recently noticed parquet file corruptions, when reading the parquet
in Spark or Presto, as the following:

Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
Can not read
value at 40870 in block 0 in file
file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-4af35426f434.c000.snappy.parquet

Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
could not read
page Page [bytes.size=1048594, valueCount=43663, 

Re: Corrupt parquet file

2018-02-05 Thread Ryan Blue
We ensure the bad node is removed from our cluster and reprocess to replace
the data. We only see this once or twice a year, so it isn't a significant
problem.

We've discussed options for adding write-side validation, but it is
expensive and still unreliable if you don't trust the hardware.

rb

On Mon, Feb 5, 2018 at 10:28 AM, Dong Jiang  wrote:

> Hi, Ryan,
>
>
> Do you have any suggestions on how we could detect and prevent this issue?
>
> This is the second time we encountered this issue. We have a wide table,
> with 134 columns in the file. The issue seems only impact one column, and
> very hard to detect. It seems you have encountered this issue before, what
> do you do to prevent a recurrence?
>
>
>
> Thanks,
>
>
>
> Dong
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Monday, February 5, 2018 at 12:46 PM
>
> *To: *Dong Jiang 
> *Cc: *Spark Dev List 
> *Subject: *Re: Corrupt parquet file
>
>
>
> If you can still access the logs, then you should be able to find where
> the write task ran. Maybe you can get an instance ID and open a ticket with
> Amazon. Otherwise, it will probably start failing the HW checks when the
> instance hardware is reused, so I wouldn't worry about it.
>
>
>
> The _SUCCESS file convention means that the job ran successfully, at least
> to the point where _SUCCESS is created. I wouldn't rely on _SUCCESS to
> indicate actual job success (you could do other tasks after that fail) and
> it carries no guarantee about the data that was written.
>
>
>
> rb
>
>
>
> On Mon, Feb 5, 2018 at 9:41 AM, Dong Jiang  wrote:
>
> Hi, Ryan,
>
>
>
> Many thanks for your quick response.
>
> We ran Spark on transient EMR clusters. Nothing in the log or EMR events
> suggests any issues with the cluster or the nodes. We also see the _SUCCESS
> file on the S3. If we see the _SUCCESS file, does that suggest all data is
> good?
>
> How can we prevent a recurrence? Can you share your experience?
>
>
>
> Thanks,
>
>
> Dong
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Monday, February 5, 2018 at 12:38 PM
> *To: *Dong Jiang 
> *Cc: *Spark Dev List 
> *Subject: *Re: Corrupt parquet file
>
>
>
> Dong,
>
>
>
> We see this from time to time as well. In my experience, it is almost
> always caused by a bad node. You should try to find out where the file was
> written and remove that node as soon as possible.
>
>
>
> As far as finding out what is wrong with the file, that's a difficult
> task. Parquet's encoding is very dense and corruption in encoded values
> often looks like different data. When you see a decoding exception like
> this, we find it is usually that the compressed data was corrupted and is
> no longer valid. You can look for the page of data based on the value
> counter, but that's about it.
>
>
>
> Even if you could find a single record that was affected, that's not
> valuable because you don't know whether there is other corruption that is
> undetectable. There's nothing to reliably recover here. What we do in this
> case is find and remove the bad node, then reprocess data so we know
> everything is correct from the upstream source.
>
>
>
> rb
>
>
>
> On Mon, Feb 5, 2018 at 9:01 AM, Dong Jiang  wrote:
>
> Hi,
>
> We are running on Spark 2.2.1, generating parquet files, like the following
> pseudo code
> df.write.parquet(...)
> We have recently noticed parquet file corruptions, when reading the parquet
> in Spark or Presto, as the following:
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 40870 in block 0 in file
> file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-
> 4af35426f434.c000.snappy.parquet
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read
> page Page [bytes.size=1048594, valueCount=43663, uncompressedSize=1048594]
> in col [incoming_aliases_array, list, element, key_value, value] BINARY
>
> It appears only one column in one of the rows in the file is corrupt, the
> file has 111041 rows.
>
> My questions are
> 1) How can I identify the corrupted row?
> 2) What could cause the corruption? Spark issue or Parquet issue?
>
> Any help is greatly appreciated.
>
> Thanks,
>
> Dong
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: Corrupt parquet file

2018-02-05 Thread Ryan Blue
If you can still access the logs, then you should be able to find where the
write task ran. Maybe you can get an instance ID and open a ticket with
Amazon. Otherwise, it will probably start failing the HW checks when the
instance hardware is reused, so I wouldn't worry about it.

The _SUCCESS file convention means that the job ran successfully, at least
to the point where _SUCCESS is created. I wouldn't rely on _SUCCESS to
indicate actual job success (you could do other tasks after that fail) and
it carries no guarantee about the data that was written.

rb

On Mon, Feb 5, 2018 at 9:41 AM, Dong Jiang  wrote:

> Hi, Ryan,
>
>
>
> Many thanks for your quick response.
>
> We ran Spark on transient EMR clusters. Nothing in the log or EMR events
> suggests any issues with the cluster or the nodes. We also see the _SUCCESS
> file on the S3. If we see the _SUCCESS file, does that suggest all data is
> good?
>
> How can we prevent a recurrence? Can you share your experience?
>
>
>
> Thanks,
>
>
> Dong
>
>
>
> *From: *Ryan Blue 
> *Reply-To: *"rb...@netflix.com" 
> *Date: *Monday, February 5, 2018 at 12:38 PM
> *To: *Dong Jiang 
> *Cc: *Spark Dev List 
> *Subject: *Re: Corrupt parquet file
>
>
>
> Dong,
>
>
>
> We see this from time to time as well. In my experience, it is almost
> always caused by a bad node. You should try to find out where the file was
> written and remove that node as soon as possible.
>
>
>
> As far as finding out what is wrong with the file, that's a difficult
> task. Parquet's encoding is very dense and corruption in encoded values
> often looks like different data. When you see a decoding exception like
> this, we find it is usually that the compressed data was corrupted and is
> no longer valid. You can look for the page of data based on the value
> counter, but that's about it.
>
>
>
> Even if you could find a single record that was affected, that's not
> valuable because you don't know whether there is other corruption that is
> undetectable. There's nothing to reliably recover here. What we do in this
> case is find and remove the bad node, then reprocess data so we know
> everything is correct from the upstream source.
>
>
>
> rb
>
>
>
> On Mon, Feb 5, 2018 at 9:01 AM, Dong Jiang  wrote:
>
> Hi,
>
> We are running on Spark 2.2.1, generating parquet files, like the following
> pseudo code
> df.write.parquet(...)
> We have recently noticed parquet file corruptions, when reading the parquet
> in Spark or Presto, as the following:
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 40870 in block 0 in file
> file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-
> 4af35426f434.c000.snappy.parquet
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read
> page Page [bytes.size=1048594, valueCount=43663, uncompressedSize=1048594]
> in col [incoming_aliases_array, list, element, key_value, value] BINARY
>
> It appears only one column in one of the rows in the file is corrupt, the
> file has 111041 rows.
>
> My questions are
> 1) How can I identify the corrupted row?
> 2) What could cause the corruption? Spark issue or Parquet issue?
>
> Any help is greatly appreciated.
>
> Thanks,
>
> Dong
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>



-- 
Ryan Blue
Software Engineer
Netflix


Re: Corrupt parquet file

2018-02-05 Thread Dong Jiang
Hi, Ryan,

Many thanks for your quick response.
We ran Spark on transient EMR clusters. Nothing in the log or EMR events 
suggests any issues with the cluster or the nodes. We also see the _SUCCESS 
file on the S3. If we see the _SUCCESS file, does that suggest all data is good?
How can we prevent a recurrence? Can you share your experience?

Thanks,

Dong

From: Ryan Blue 
Reply-To: "rb...@netflix.com" 
Date: Monday, February 5, 2018 at 12:38 PM
To: Dong Jiang 
Cc: Spark Dev List 
Subject: Re: Corrupt parquet file

Dong,

We see this from time to time as well. In my experience, it is almost always 
caused by a bad node. You should try to find out where the file was written and 
remove that node as soon as possible.

As far as finding out what is wrong with the file, that's a difficult task. 
Parquet's encoding is very dense and corruption in encoded values often looks 
like different data. When you see a decoding exception like this, we find it is 
usually that the compressed data was corrupted and is no longer valid. You can 
look for the page of data based on the value counter, but that's about it.

Even if you could find a single record that was affected, that's not valuable 
because you don't know whether there is other corruption that is undetectable. 
There's nothing to reliably recover here. What we do in this case is find and 
remove the bad node, then reprocess data so we know everything is correct from 
the upstream source.

rb

On Mon, Feb 5, 2018 at 9:01 AM, Dong Jiang 
> wrote:
Hi,

We are running on Spark 2.2.1, generating parquet files, like the following
pseudo code
df.write.parquet(...)
We have recently noticed parquet file corruptions, when reading the parquet
in Spark or Presto, as the following:

Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
Can not read
value at 40870 in block 0 in file
file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-4af35426f434.c000.snappy.parquet

Caused by: 
org.apache.parquet.io.ParquetDecodingException: 
could not read
page Page [bytes.size=1048594, valueCount=43663, uncompressedSize=1048594]
in col [incoming_aliases_array, list, element, key_value, value] BINARY

It appears only one column in one of the rows in the file is corrupt, the
file has 111041 rows.

My questions are
1) How can I identify the corrupted row?
2) What could cause the corruption? Spark issue or Parquet issue?

Any help is greatly appreciated.

Thanks,

Dong



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: 
dev-unsubscr...@spark.apache.org



--
Ryan Blue
Software Engineer
Netflix


Re: Corrupt parquet file

2018-02-05 Thread Ryan Blue
Dong,

We see this from time to time as well. In my experience, it is almost
always caused by a bad node. You should try to find out where the file was
written and remove that node as soon as possible.

As far as finding out what is wrong with the file, that's a difficult task.
Parquet's encoding is very dense and corruption in encoded values often
looks like different data. When you see a decoding exception like this, we
find it is usually that the compressed data was corrupted and is no longer
valid. You can look for the page of data based on the value counter, but
that's about it.

Even if you could find a single record that was affected, that's not
valuable because you don't know whether there is other corruption that is
undetectable. There's nothing to reliably recover here. What we do in this
case is find and remove the bad node, then reprocess data so we know
everything is correct from the upstream source.

rb

On Mon, Feb 5, 2018 at 9:01 AM, Dong Jiang  wrote:

> Hi,
>
> We are running on Spark 2.2.1, generating parquet files, like the following
> pseudo code
> df.write.parquet(...)
> We have recently noticed parquet file corruptions, when reading the parquet
> in Spark or Presto, as the following:
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
> value at 40870 in block 0 in file
> file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-
> 4af35426f434.c000.snappy.parquet
>
> Caused by: org.apache.parquet.io.ParquetDecodingException: could not read
> page Page [bytes.size=1048594, valueCount=43663, uncompressedSize=1048594]
> in col [incoming_aliases_array, list, element, key_value, value] BINARY
>
> It appears only one column in one of the rows in the file is corrupt, the
> file has 111041 rows.
>
> My questions are
> 1) How can I identify the corrupted row?
> 2) What could cause the corruption? Spark issue or Parquet issue?
>
> Any help is greatly appreciated.
>
> Thanks,
>
> Dong
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>


-- 
Ryan Blue
Software Engineer
Netflix


Re: Union in Spark context

2018-02-05 Thread Suchith J N
Thank you very much. I had overlooked the differences between the two.

The public API part is understandable.

Coming to second part. - I see that it creates an instance of UnionRDD with
all RDDs as parent there by preventing long lineage chain.
Is my understanding correct?

On 5 February 2018 at 22:17, Mark Hamstra  wrote:

> First, the public API cannot be changed except when there is a major
> version change, and there is no way that we are going to do Spark 3.0.0
> just for this change.
>
> Second, the change would be a mistake since the two different union
> methods are quite different. The method in RDD only ever works on two RDDs
> at a time, whereas the method in SparkContext can work on many RDDs in a
> single call. That means that the method in SparkContext is much preferred
> when unioning many RDDs to prevent a lengthy lineage chain.
>
> On Mon, Feb 5, 2018 at 8:04 AM, Suchith J N  wrote:
>
>> Hi,
>>
>> Seems like simple clean up - Why do we have union() on RDDs in
>> SparkContext? Shouldn't it reside in RDD? There is one in RDD, but it seems
>> like a wrapper around this.
>>
>> Regards,
>> Suchith
>>
>
>


Corrupt parquet file

2018-02-05 Thread Dong Jiang
Hi, 

We are running on Spark 2.2.1, generating parquet files, like the following
pseudo code
df.write.parquet(...)
We have recently noticed parquet file corruptions, when reading the parquet
in Spark or Presto, as the following:

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read
value at 40870 in block 0 in file
file:/Users/djiang/part-00122-80f4886a-75ce-42fa-b78f-4af35426f434.c000.snappy.parquet

Caused by: org.apache.parquet.io.ParquetDecodingException: could not read
page Page [bytes.size=1048594, valueCount=43663, uncompressedSize=1048594]
in col [incoming_aliases_array, list, element, key_value, value] BINARY

It appears only one column in one of the rows in the file is corrupt, the
file has 111041 rows.

My questions are
1) How can I identify the corrupted row?
2) What could cause the corruption? Spark issue or Parquet issue?

Any help is greatly appreciated.

Thanks,

Dong



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: Union in Spark context

2018-02-05 Thread Mark Hamstra
First, the public API cannot be changed except when there is a major
version change, and there is no way that we are going to do Spark 3.0.0
just for this change.

Second, the change would be a mistake since the two different union methods
are quite different. The method in RDD only ever works on two RDDs at a
time, whereas the method in SparkContext can work on many RDDs in a single
call. That means that the method in SparkContext is much preferred when
unioning many RDDs to prevent a lengthy lineage chain.

On Mon, Feb 5, 2018 at 8:04 AM, Suchith J N  wrote:

> Hi,
>
> Seems like simple clean up - Why do we have union() on RDDs in
> SparkContext? Shouldn't it reside in RDD? There is one in RDD, but it seems
> like a wrapper around this.
>
> Regards,
> Suchith
>


Re: Union in Spark context

2018-02-05 Thread 0xF0F0F0
There is one on RDD but `SparkContext.union` prevents lineage from growing. 
Check  https://stackoverflow.com/q/34461804

Sent with [ProtonMail](https://protonmail.com) Secure Email.

 Original Message 
On February 5, 2018 5:04 PM, Suchith J N  wrote:

> Hi,
>
> Seems like simple clean up - Why do we have union() on RDDs in SparkContext? 
> Shouldn't it reside in RDD? There is one in RDD, but it seems like a wrapper 
> around this.
>
> Regards,
> Suchith

Union in Spark context

2018-02-05 Thread Suchith J N
Hi,

Seems like simple clean up - Why do we have union() on RDDs in
SparkContext? Shouldn't it reside in RDD? There is one in RDD, but it seems
like a wrapper around this.

Regards,
Suchith