Re: Spark Dataframe returning null columns when schema is specified

2017-09-07 Thread Praneeth Gayam
What is the desired behaviour when a field is null for only a few records?
You can not avoid nulls in this case
But if all rows are guaranteed to be uniform(either all-null are
all-non-null), you can *take* the first row of the DF and *drop* the
columns with null fields.

On Fri, Sep 8, 2017 at 12:14 AM, ravi6c2  wrote:

> Hi All, I have this problem where in Spark Dataframe is having null columns
> for the attributes from JSON that are not present. A clear explanation is
> provided below:
>
> *Use case:* Convert the JSON object into dataframe for further usage.
>
> *Case - 1:* Without specifying the schema for JSON:
>
> records.foreachRDD(new VoidFunction2, Time>() {
> private static final long serialVersionUID = 1L;
> @Override
> public void call(JavaRDD rdd, Time time)
> throws Exception {
> if (rdd.count() > 0) {
> JavaRDD filteredRDD =
> rdd.filter(x -> x.length()>0);
> sqlContext = SQLContextSingleton.
> getInstance(filteredRDD.context());
> DataFrame df =
> sqlContext.read().json(filteredRDD);
> df.show();
> }
> }
> });
>
> In the above code sample, filteredRDD is Strings as JSON Objects.
>
> *Sample JSON Record: *
> {"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","
> queue_id":1234,"disposition":"O","created":"2017-06-02
> 23:49:10.410","assigned":"2017-06-02
> 23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
> 23:49:10.410"}
>
> *Dataframe Output:*
>
>  file/t8407/Screenshot_at_Sep_07_11-36-27.png>
>
> *Case - 2:* With specifying the schema for JSON:
>
> records.foreachRDD(new VoidFunction2, Time>() {
> private static final long serialVersionUID = 1L;
> @Override
> public void call(JavaRDD rdd, Time time)
> throws Exception {
> if (rdd.count() > 0) {
> JavaRDD filteredRDD =
> rdd.filter(x -> x.length()>0);
> sqlContext = SQLContextSingleton.
> getInstance(filteredRDD.context());
> DataFrame df =
> sqlContext.read().schema(SchemaBuilder.buildSchema()).json(filteredRDD);
> df.show();
> }
> }
> });
>
> In the above code sample, filteredRDD is Strings as JSON Objects.
>
> *Schema Definition:*
> public static StructType buildSchema() {
> StructType schema = new StructType(
> new StructField[] {
> DataTypes.createStructField("request_id",
> DataTypes.StringType, false),
>
> DataTypes.createStructField("org_id", DataTypes.StringType, false),
>
> DataTypes.createStructField("queue_id", DataTypes.IntegerType, true),
>
> DataTypes.createStructField("owner", DataTypes.StringType, true),
>
> DataTypes.createStructField("disposition", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("created", DataTypes.TimestampType, true),
>
> DataTypes.createStructField("created_user", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("assigned", DataTypes.TimestampType,
> true),
>
> DataTypes.createStructField("assigned_user", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("notes", DataTypes.StringType, true),
>
> DataTypes.createStructField("final_review_status",
> DataTypes.StringType, true),
>
> DataTypes.createStructField("event_tag", DataTypes.StringType, true),
>
> DataTypes.createStructField("additional_data", DataTypes.StringType,
> true),
>
> DataTypes.createStructField("datetime", DataTypes.TimestampType,
> true),
>
> DataTypes.createStructField("dc", DataTypes.StringType, true),
>
> DataTypes.createStructField("case_id", DataTypes.StringType, true),
>
> DataTypes.createStructField("case_status", DataTypes.StringType, true)
> });
> return (schema);
> }
>
> *Sample JSON Record: *
> {"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","
> queue_id":1234,"disposition":"O","created":"2017-06-02
> 23:49:10.410","assigned":"2017-06-02
> 23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
> 23:49:10.410"}
>
> *Dataframe Output:*
>  >
>
> If you see in the above case, when schema is defined I am getting the
> columns that are not specified in the JSON as null. Any work around on
> getting the result as expected in the first image(without nulls) using
> schema? I needed this to perform updates into Kudu table. As the other
> columns are assigned NULL,

Re: Chaining Spark Streaming Jobs

2017-09-07 Thread Praneeth Gayam
With file stream you will have to deal with the following

   1. The file(s) must not be changed once created. So if the files are
   being continuously appended, the new data will not be read. Refer
   

   2. The files must be created in the dataDirectory by atomically *moving*
or *renaming* them into the data directory.

Since the latency requirements for the second job in the chain is only a
few mins, you may have to end up creating a new file every few mins

You may want to consider Kafka as your intermediary store for building a
chain/DAG of streaming jobs

On Fri, Sep 8, 2017 at 9:45 AM, Sunita Arvind  wrote:

> Thanks for your response Michael
> Will try it out.
>
> Regards
> Sunita
>
> On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>>> enrichment on the same dataframe after persisting the raw data, however, in
>>> order to modularize I am planning to have a separate job which picks up the
>>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>>> 1 job as the enrichments could get project specific while raw data
>>> persistence stays customer/project agnostic.The enriched data is allowed to
>>> have some latency (few minutes)
>>>
>>> My challenge is, after persisting the raw data, how do I chain the next
>>> streaming job. The only way I can think of is -  job 1 (raw data)
>>> partitions on current date (MMDD) and within current date, the job 2
>>> (enrichment job) filters for records within 60s of current time and
>>> performs enrichment on it in 60s batches.
>>> Is this a good option? It seems to be error prone. When either of the
>>> jobs get delayed due to bursts or any error/exception this could lead to
>>> huge data losses and non-deterministic behavior . What are other
>>> alternatives to this?
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita Koppar
>>>
>>
>>


Spark ML DAG Pipelines

2017-09-07 Thread Srikanth Sampath
Hi Spark Experts,

Can someone point me to some examples for non-linear (DAG) ML pipelines.
That would be of great help.
Thanks much in advance
-Srikanth


Re: Chaining Spark Streaming Jobs

2017-09-07 Thread Sunita Arvind
Thanks for your response Michael
Will try it out.

Regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>> 1 job as the enrichments could get project specific while raw data
>> persistence stays customer/project agnostic.The enriched data is allowed to
>> have some latency (few minutes)
>>
>> My challenge is, after persisting the raw data, how do I chain the next
>> streaming job. The only way I can think of is -  job 1 (raw data)
>> partitions on current date (MMDD) and within current date, the job 2
>> (enrichment job) filters for records within 60s of current time and
>> performs enrichment on it in 60s batches.
>> Is this a good option? It seems to be error prone. When either of the
>> jobs get delayed due to bursts or any error/exception this could lead to
>> huge data losses and non-deterministic behavior . What are other
>> alternatives to this?
>>
>> Appreciate any guidance in this regard.
>>
>> regards
>> Sunita Koppar
>>
>
>


Re: [Meetup] Apache Spark and Ignite for IoT scenarious

2017-09-07 Thread Denis Magda
Hello Anjaneya, Marco,

Honestly, I’m not aware if the video broadcasting or recording is planned. 
Could you go to the meetup page [1] and raise the question there?

Just in case, here is you can find a list of all upcoming Ignite related events 
[2]. Probably some of them will be in close proximity to you:

[1] https://www.meetup.com/datariders/events/242523245/?a=socialmedia 

[2] https://ignite.apache.org/events.html 


—
Denis

> On Sep 7, 2017, at 12:31 PM, Marco Mistroni  wrote:
> 
> Hi
>  Will there be a podcast to view afterwards for remote EMEA users?
> Kr
> 
> On Sep 7, 2017 12:15 AM, "Denis Magda"  > wrote:
> Folks, 
> 
> Those who are craving for mind food this weekend come over the meetup  - 
> Santa Clara, Sept 9, 9.30 AM:
> https://www.meetup.com/datariders/events/242523245/?a=socialmedia 
> 
> 
> —
> Denis



Re: [Meetup] Apache Spark and Ignite for IoT scenarious

2017-09-07 Thread Marco Mistroni
Hi
 Will there be a podcast to view afterwards for remote EMEA users?
Kr

On Sep 7, 2017 12:15 AM, "Denis Magda"  wrote:

> Folks,
>
> Those who are craving for mind food this weekend come over the meetup  -
> Santa Clara, Sept 9, 9.30 AM:
> https://www.meetup.com/datariders/events/242523245/?a=socialmedia
>
> —
> Denis
>


Spark Dataframe returning null columns when schema is specified

2017-09-07 Thread ravi6c2
Hi All, I have this problem where in Spark Dataframe is having null columns
for the attributes from JSON that are not present. A clear explanation is
provided below:

*Use case:* Convert the JSON object into dataframe for further usage.

*Case - 1:* Without specifying the schema for JSON:

records.foreachRDD(new VoidFunction2, Time>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD rdd, Time time) throws 
Exception {
if (rdd.count() > 0) {
JavaRDD filteredRDD = 
rdd.filter(x -> x.length()>0);
sqlContext = 
SQLContextSingleton.getInstance(filteredRDD.context());
DataFrame df = 
sqlContext.read().json(filteredRDD);
df.show();
}
}
});

In the above code sample, filteredRDD is Strings as JSON Objects.

*Sample JSON Record: *
{"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","queue_id":1234,"disposition":"O","created":"2017-06-02
23:49:10.410","assigned":"2017-06-02
23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
23:49:10.410"}

*Dataframe Output:*


 

*Case - 2:* With specifying the schema for JSON:

records.foreachRDD(new VoidFunction2, Time>() {
private static final long serialVersionUID = 1L;
@Override
public void call(JavaRDD rdd, Time time) throws 
Exception {
if (rdd.count() > 0) {
JavaRDD filteredRDD = 
rdd.filter(x -> x.length()>0);
sqlContext = 
SQLContextSingleton.getInstance(filteredRDD.context());
DataFrame df =
sqlContext.read().schema(SchemaBuilder.buildSchema()).json(filteredRDD);
df.show();
}
}
});

In the above code sample, filteredRDD is Strings as JSON Objects.

*Schema Definition:*
public static StructType buildSchema() {
StructType schema = new StructType(
new StructField[] { 
DataTypes.createStructField("request_id",
DataTypes.StringType, false),

DataTypes.createStructField("org_id", DataTypes.StringType, false),

DataTypes.createStructField("queue_id", DataTypes.IntegerType, true),

DataTypes.createStructField("owner", DataTypes.StringType, true),

DataTypes.createStructField("disposition", DataTypes.StringType,
true),

DataTypes.createStructField("created", DataTypes.TimestampType, true),

DataTypes.createStructField("created_user", DataTypes.StringType,
true),

DataTypes.createStructField("assigned", DataTypes.TimestampType,
true),

DataTypes.createStructField("assigned_user", DataTypes.StringType,
true),

DataTypes.createStructField("notes", DataTypes.StringType, true),

DataTypes.createStructField("final_review_status",
DataTypes.StringType, true),

DataTypes.createStructField("event_tag", DataTypes.StringType, true),

DataTypes.createStructField("additional_data", DataTypes.StringType,
true),

DataTypes.createStructField("datetime", DataTypes.TimestampType,
true),

DataTypes.createStructField("dc", DataTypes.StringType, true),

DataTypes.createStructField("case_id", DataTypes.StringType, true),

DataTypes.createStructField("case_status", DataTypes.StringType, true)
});
return (schema);
}

*Sample JSON Record: *
{"request_id":"f791e831f71e4918b2fcaebfdf6fe2c2","org_id":"y08e7p9g","queue_id":1234,"disposition":"O","created":"2017-06-02
23:49:10.410","assigned":"2017-06-02
23:49:10.410","final_review_status":"cancel","datetime":"2017-06-02
23:49:10.410"}

*Dataframe Output:*
 

If you 

Re: CSV write to S3 failing silently with partial completion

2017-09-07 Thread Mcclintic, Abbi
Thanks all – couple notes below.



Generally all our partitions are of equal size (ie on a normal day in this 
particular case I see 10 equally sized partitions of 2.8 GB). We see the 
problem with repartitioning and without – in this example we are repartitioning 
to 10 but we also see the problem without any repartitioning when the default 
partition count is 200. We know that data loss is occurring because we have a 
final quality gate that counts the number of rows and halts the process if we 
see too large of a drop.



We have one use case where the data needs to be read on a local machine after 
processing and one use case of copying to redshift. Regarding the redshift 
copy, it gets a bit complicated owing to VPC and encryption requirements so we 
haven’t looked into using the JDBC driver yet.



My understanding was that Amazon EMR does not support 
s3a,
 but it may be worth looking into. We may also try a combination of writing to 
HDFS combined with s3distcp.



Thanks,



Abbi





On 9/7/17, 7:50 AM, "Patrick Alwell"  wrote:



Sounds like an S3 bug. Can you replicate locally with HDFS?



Try using S3a protocol too; there is a jar you can leverage like so: 
spark-submit --packages 
com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 
my_spark_program.py



EMR can sometimes be buggy. :/



You could also try leveraging EC2 nodes and manually creating a cluster 
with password less SSH.



But I feel your pain man, I’ve had weird issues with Redshift and EMR as 
well.



Let me know if you can or can’t replicate locally; and I can bring it up 
with our S3 team for the next release of HDP and we can file a bug with AWS.



-Pat



On 9/7/17, 2:59 AM, "JG Perrin"  wrote:



Are you assuming that all partitions are of equal size? Did you try 
with more partitions (like repartitioning)? Does the error always happen with 
the last (or smaller) file? If you are sending to redshift, why not use the 
JDBC driver?



-Original Message-

From: abbim [mailto:ab...@amazon.com]

Sent: Thursday, September 07, 2017 1:02 AM

To: user@spark.apache.org

Subject: CSV write to S3 failing silently with partial completion



Hi all,

My team has been experiencing a recurring unpredictable bug where only 
a partial write to CSV in S3 on one partition of our Dataset is performed. For 
example, in a Dataset of 10 partitions written to CSV in S3, we might see 9 of 
the partitions as 2.8 GB in size, but one of them as 1.6 GB. However, the job 
does not exit with an error code.



This becomes problematic in the following ways:

1. When we copy the data to Redshift, we get a bad decrypt error on the 
partial file, suggesting that the failure occurred at a weird byte in the file.

2. We lose data - sometimes as much as 10%.



We don't see this problem with parquet format, which we also use, but 
moving all of our data to parquet is not currently feasible. We're using the 
Java API with Spark 2.2 and Amazon EMR 5.8, code is a simple as this:

df.write().csv("s3://some-bucket/some_location"). We're experiencing 
the issue 1-3x/week on a daily job and are unable to reliably reproduce the 
problem.



Any thoughts on why we might be seeing this and how to resolve?

Thanks in advance.







--

Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/



-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org



__

This electronic transmission and any documents accompanying this 
electronic transmission contain confidential information belonging to the 
sender.  This information may contain confidential health information that is 
legally privileged.  The information is intended only for the use of the 
individual or entity named above.  The authorized recipient of this 
transmission is prohibited from disclosing this information to any other party 
unless required to do so by law or regulation and is required to delete or 
destroy the information after its stated need has been fulfilled.  If you are 
not the intended recipient, you are hereby notified that any disclosure, 
copying, distribution or the taking of any action in reliance on or regarding 
the contents of this electronically transmitted information is strictly 
prohibited.  If you have received this E-mail in error, please notify the 
sender and delete this message immediately.



-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org










Spark UI to use Marathon assigned port

2017-09-07 Thread Sunil Kalyanpur
Hello all,

I am running PySpark Job (v2.0.2) with checkpoint enabled in Mesos cluster
and am using Marathon for orchestration.

When the job is restarted using Marathon, Spark UI is not getting started
at the port specified by Marathon. Instead, it is picking port from the
checkpoint.

Is there a way we can make spark job to use the port assigned by Marathon
instead of Spark job picking the configuration from the Checkpoint?

Please let me know if you need any information.

Thank you,
Sunil


Re: graphframe out of memory

2017-09-07 Thread Lukas Bradley
Did you also increase the size of the heap of the Java app that is starting
Spark?

https://alvinalexander.com/blog/post/java/java-xmx-xms-memory-heap-size-control

On Thu, Sep 7, 2017 at 12:16 PM, Imran Rajjad  wrote:

> I am getting Out of Memory error while running connectedComponents job on
> graph with around 12000 vertices and 134600 edges.
> I am running spark in embedded mode in a standalone Java application and
> have tried to increase the memory but it seems that its not taking any
> effect
>
> sparkConf = new SparkConf().setAppName("SOME APP
> NAME").setMaster("local[10]")
> .set("spark.executor.memory","5g")
> .set("spark.driver.memory","8g")
> .set("spark.driver.maxResultSize","1g")
> .set("spark.sql.warehouse.dir", "file:///d:/spark/tmp")
> .set("hadoop.home.dir", "file:///D:/spark-2.1.0-bin-hadoop2.7/bin");
>
>   spark = SparkSession.builder().config(sparkConf).getOrCreate();
>   spark.sparkContext().setLogLevel("ERROR");
>   spark.sparkContext().setCheckpointDir("D:/spark/tmp");
>
> the stack trace
> java.lang.OutOfMemoryError: Java heap space
>  at java.util.Arrays.copyOf(Arrays.java:3332)
>  at java.lang.AbstractStringBuilder.ensureCapacityInternal(
> AbstractStringBuilder.java:124)
>  at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
>  at java.lang.StringBuilder.append(StringBuilder.java:136)
>  at scala.StringContext.standardInterpolator(StringContext.scala:126)
>  at scala.StringContext.s(StringContext.scala:95)
>  at org.apache.spark.sql.execution.QueryExecution.
> toString(QueryExecution.scala:230)
>  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(
> SQLExecution.scala:54)
>  at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
>  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$
> execute$1(Dataset.scala:2385)
>  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$
> Dataset$$collect$1.apply(Dataset.scala:2390)
>  at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$
> Dataset$$collect$1.apply(Dataset.scala:2390)
>  at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801)
>  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$
> collect(Dataset.scala:2390)
>  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2366)
>  at org.graphframes.lib.ConnectedComponents$.skewedJoin(
> ConnectedComponents.scala:239)
>  at org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$
> ConnectedComponents$$run(ConnectedComponents.scala:308)
>  at org.graphframes.lib.ConnectedComponents.run(
> ConnectedComponents.scala:139)
>
> GraphFrame version is 0.5.0 and Spark version is 2.1.1
>
> regards,
> Imran
>
> --
> I.R
>


graphframe out of memory

2017-09-07 Thread Imran Rajjad
I am getting Out of Memory error while running connectedComponents job on
graph with around 12000 vertices and 134600 edges.
I am running spark in embedded mode in a standalone Java application and
have tried to increase the memory but it seems that its not taking any
effect

sparkConf = new SparkConf().setAppName("SOME APP
NAME").setMaster("local[10]")
.set("spark.executor.memory","5g")
.set("spark.driver.memory","8g")
.set("spark.driver.maxResultSize","1g")
.set("spark.sql.warehouse.dir", "file:///d:/spark/tmp")
.set("hadoop.home.dir", "file:///D:/spark-2.1.0-bin-hadoop2.7/bin");

  spark = SparkSession.builder().config(sparkConf).getOrCreate();
  spark.sparkContext().setLogLevel("ERROR");
  spark.sparkContext().setCheckpointDir("D:/spark/tmp");

the stack trace
java.lang.OutOfMemoryError: Java heap space
 at java.util.Arrays.copyOf(Arrays.java:3332)
 at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
 at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
 at java.lang.StringBuilder.append(StringBuilder.java:136)
 at scala.StringContext.standardInterpolator(StringContext.scala:126)
 at scala.StringContext.s(StringContext.scala:95)
 at
org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:230)
 at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:54)
 at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2788)
 at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2385)
 at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2390)
 at
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$collect$1.apply(Dataset.scala:2390)
 at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2801)
 at org.apache.spark.sql.Dataset.org
$apache$spark$sql$Dataset$$collect(Dataset.scala:2390)
 at org.apache.spark.sql.Dataset.collect(Dataset.scala:2366)
 at
org.graphframes.lib.ConnectedComponents$.skewedJoin(ConnectedComponents.scala:239)
 at
org.graphframes.lib.ConnectedComponents$.org$graphframes$lib$ConnectedComponents$$run(ConnectedComponents.scala:308)
 at
org.graphframes.lib.ConnectedComponents.run(ConnectedComponents.scala:139)

GraphFrame version is 0.5.0 and Spark version is 2.1.1

regards,
Imran

-- 
I.R


Re: CSV write to S3 failing silently with partial completion

2017-09-07 Thread Patrick Alwell
Sounds like an S3 bug. Can you replicate locally with HDFS?

Try using S3a protocol too; there is a jar you can leverage like so: 
spark-submit --packages 
com.amazonaws:aws-java-sdk:1.7.4,org.apache.hadoop:hadoop-aws:2.7.3 
my_spark_program.py

EMR can sometimes be buggy. :/

You could also try leveraging EC2 nodes and manually creating a cluster with 
password less SSH.

But I feel your pain man, I’ve had weird issues with Redshift and EMR as well.

Let me know if you can or can’t replicate locally; and I can bring it up with 
our S3 team for the next release of HDP and we can file a bug with AWS.

-Pat

On 9/7/17, 2:59 AM, "JG Perrin"  wrote:

Are you assuming that all partitions are of equal size? Did you try with 
more partitions (like repartitioning)? Does the error always happen with the 
last (or smaller) file? If you are sending to redshift, why not use the JDBC 
driver?

-Original Message-
From: abbim [mailto:ab...@amazon.com] 
Sent: Thursday, September 07, 2017 1:02 AM
To: user@spark.apache.org
Subject: CSV write to S3 failing silently with partial completion

Hi all,
My team has been experiencing a recurring unpredictable bug where only a 
partial write to CSV in S3 on one partition of our Dataset is performed. For 
example, in a Dataset of 10 partitions written to CSV in S3, we might see 9 of 
the partitions as 2.8 GB in size, but one of them as 1.6 GB. However, the job 
does not exit with an error code.

This becomes problematic in the following ways:
1. When we copy the data to Redshift, we get a bad decrypt error on the 
partial file, suggesting that the failure occurred at a weird byte in the file. 
2. We lose data - sometimes as much as 10%.

We don't see this problem with parquet format, which we also use, but 
moving all of our data to parquet is not currently feasible. We're using the 
Java API with Spark 2.2 and Amazon EMR 5.8, code is a simple as this:
df.write().csv("s3://some-bucket/some_location"). We're experiencing the 
issue 1-3x/week on a daily job and are unable to reliably reproduce the 
problem. 

Any thoughts on why we might be seeing this and how to resolve?
Thanks in advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





RE: CSV write to S3 failing silently with partial completion

2017-09-07 Thread JG Perrin
Are you assuming that all partitions are of equal size? Did you try with more 
partitions (like repartitioning)? Does the error always happen with the last 
(or smaller) file? If you are sending to redshift, why not use the JDBC driver?

-Original Message-
From: abbim [mailto:ab...@amazon.com] 
Sent: Thursday, September 07, 2017 1:02 AM
To: user@spark.apache.org
Subject: CSV write to S3 failing silently with partial completion

Hi all,
My team has been experiencing a recurring unpredictable bug where only a 
partial write to CSV in S3 on one partition of our Dataset is performed. For 
example, in a Dataset of 10 partitions written to CSV in S3, we might see 9 of 
the partitions as 2.8 GB in size, but one of them as 1.6 GB. However, the job 
does not exit with an error code.

This becomes problematic in the following ways:
1. When we copy the data to Redshift, we get a bad decrypt error on the partial 
file, suggesting that the failure occurred at a weird byte in the file. 
2. We lose data - sometimes as much as 10%.

We don't see this problem with parquet format, which we also use, but moving 
all of our data to parquet is not currently feasible. We're using the Java API 
with Spark 2.2 and Amazon EMR 5.8, code is a simple as this:
df.write().csv("s3://some-bucket/some_location"). We're experiencing the issue 
1-3x/week on a daily job and are unable to reliably reproduce the problem. 

Any thoughts on why we might be seeing this and how to resolve?
Thanks in advance.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

__
This electronic transmission and any documents accompanying this electronic 
transmission contain confidential information belonging to the sender.  This 
information may contain confidential health information that is legally 
privileged.  The information is intended only for the use of the individual or 
entity named above.  The authorized recipient of this transmission is 
prohibited from disclosing this information to any other party unless required 
to do so by law or regulation and is required to delete or destroy the 
information after its stated need has been fulfilled.  If you are not the 
intended recipient, you are hereby notified that any disclosure, copying, 
distribution or the taking of any action in reliance on or regarding the 
contents of this electronically transmitted information is strictly prohibited. 
 If you have received this E-mail in error, please notify the sender and delete 
this message immediately.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Pyspark UDF causing ExecutorLostFailure

2017-09-07 Thread nicktgr15
Hi,
I'm using spark 2.1.0 on AWS EMR (Yarn) and trying to use a UDF in python as
follows:

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

path = 's3://some/parquet/dir/myfile.parquet'
df = spark.read.load(path)
def _test_udf(useragent):
return useragent.upper()

test_udf = udf(_test_udf, StringType())
df = df.withColumn('test_field', test_udf(col('std_useragent')))
df.write.parquet('/output.parquet')

The following config is used in spark-defaults.conf (using
maximizeResourceAllocation in EMR)

...
spark.executor.instances 4
spark.executor.cores 8
spark.driver.memory  8G
spark.executor.memory9658M
spark.default.parallelism64
spark.driver.maxResultSize   3G
...

The cluster has 4 worker nodes (+1 master) with the following specs: 8 vCPU,
15 GiB memory, 160 SSD GB storage
The above example fails every single time with errors like the following:

17/09/06 09:58:08 WARN TaskSetManager: Lost task 26.1 in stage 1.0 (TID 50,
ip-172-31-7-125.eu-west-1.compute.internal, executor 10):
ExecutorLostFailure (executor 10 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 10.4 GB of
10.4 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

I tried to increase the spark.yarn.executor.memoryOverhead to 3000 which
delays the errors but eventually I get them before the end of the job. The
job eventually fails.

If I run the above job in scala everything works as expected (without having
to adjust the memoryOverhead)

import org.apache.spark.sql.functions.udf

val upper: String => String = _.toUpperCase
val df = spark.read.load("s3://some/parquet/dir/myfile.parquet")
val upperUDF = udf(upper)
val newdf = df.withColumn("test_field", upperUDF(col("std_useragent")))
newdf.write.parquet("/output.parquet")

Cpu utilisation is also very bad with pyspark.

Is this a known bug with pyspark and udfs or is it a matter of bad
configuration? 

I've also raised a ticket in apache jira
https://issues.apache.org/jira/browse/SPARK-21935 which includes some
screenshots.




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org