RE: OS killing Executor due to high (possibly off heap) memory usage

2016-11-24 Thread Shreya Agarwal
I don’t think it’s just memory overhead. It might be better to use an execute 
with lesser heap space(30GB?). 46 GB would mean more data load into memory and 
more GC, which can cause issues.

Also, have you tried to persist data in any way? If so, then that might be 
causing an issue.

Lastly, I am not sure if your data has a skew and if that is forcing a lot of 
data to be on one executor node.

Sent from my Windows 10 phone

From: Rodrick Brown
Sent: Friday, November 25, 2016 12:25 AM
To: Aniket Bhatnagar
Cc: user
Subject: Re: OS killing Executor due to high (possibly off heap) memory usage

Try setting spark.yarn.executor.memoryOverhead 1

On Thu, Nov 24, 2016 at 11:16 AM, Aniket Bhatnagar 
> wrote:
Hi Spark users

I am running a job that does join of a huge dataset (7 TB+) and the executors 
keep crashing randomly, eventually causing the job to crash. There are no out 
of memory exceptions in the log and looking at the dmesg output, it seems like 
the OS killed the JVM because of high memory usage. My suspicion is towards off 
heap usage of executor is causing this as I am limiting the on heap usage of 
executor to be 46 GB and each host running the executor has 60 GB of RAM. After 
the executor crashes, I can see that the external shuffle manager 
(org.apache.spark.network.server.TransportRequestHandler) logs a lot of channel 
closed exceptions in yarn node manager logs. This leads me to believe that 
something triggers out of memory during shuffle read. Is there a configuration 
to completely disable usage of off heap memory? I have tried setting 
spark.shuffle.io.preferDirectBufs=false but the 
executor is still getting killed by the same error.

Cluster details:
10 AWS c4.8xlarge hosts
RAM on each host - 60 GB
Number of cores on each host - 36
Additional hard disk on each host - 8 TB

Spark configuration:
dynamic allocation enabled
external shuffle service enabled
spark.driver.memory 1024M
spark.executor.memory 47127M
Spark master yarn-cluster

Sample error in yarn node manager:
2016-11-24 10:34:06,507 ERROR 
org.apache.spark.network.server.TransportRequestHandler (shuffle-server-50): 
Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=919299554123, 
chunkIndex=0}, 
buffer=FileSegmentManagedBuffer{file=/mnt3/yarn/usercache/hadoop/appcache/application_1479898345621_0006/blockmgr-ad5301a9-e1e9-4723-a8c4-9276971b2259/2c/shuffle_3_963_0.data,
 offset=0, length=669014456}} to 
/10.192.108.170:52782; closing connection
java.nio.channels.ClosedChannelException

Error in dmesg:
[799873.309897] Out of memory: Kill process 50001 (java) score 927 or sacrifice 
child
[799873.314439] Killed process 50001 (java) total-vm:65652448kB, 
anon-rss:57246528kB, file-rss:0kB

Thanks,
Aniket



--

[Orchard Platform]

Rodrick Brown / DevOPs

9174456839 / rodr...@orchardplatform.com

Orchard Platform
101 5th Avenue, 4th Floor, New York, NY

NOTICE TO RECIPIENTS: This communication is confidential and intended for the 
use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an offer to 
sell or a solicitation of an indication of interest to purchase any loan, 
security or any other financial product or instrument, nor is it an offer to 
sell or a solicitation of an indication of interest to purchase any products or 
services to any persons who are prohibited from receiving such information 
under applicable law. The contents of this communication may not be accurate or 
complete and are subject to change without notice. As such, Orchard App, Inc. 
(including its subsidiaries and affiliates, "Orchard") makes no representation 
regarding the accuracy or completeness of the information contained herein. The 
intended recipient is advised to consult its own professional advisors, 
including those specializing in legal, tax and accounting matters. Orchard does 
not provide legal, tax or accounting advice.


RE: How to expose Spark-Shell in the production?

2016-11-23 Thread Shreya Agarwal
Use Livy out job server to execute spark-shell commands remotely

Sent from my Windows 10 phone

From: kant kodali
Sent: Saturday, November 19, 2016 12:57 AM
To: user @spark
Subject: How to expose Spark-Shell in the production?

How to expose Spark-Shell in the production?

1) Should we expose it on Master Nodes or Executer nodes?
2) Should we simple give access to those machines and Spark-Shell binary? what 
is the recommended way?

Thanks!


RE: Join Query

2016-11-20 Thread Shreya Agarwal

Replication join = broadcast join. Look for that term on google. Many examples.

Semi join can be done on dataframes/dataset by passing “semi join” as the third 
parameter on the join/joinWith function.

Not sure about the other two.

Sent from my Windows 10 phone

From: Aakash Basu
Sent: Thursday, November 17, 2016 3:17 PM
To: user@spark.apache.org
Subject: Join Query

Hi,



Conceptually I can understand below spark joins, when it comes to 
implementation I don’t find much information in Google. Please help me with 
code/pseudo code for below joins using java-spark or scala-spark.

Replication Join:
Given two datasets, where one is small enough to fit into the 
memory, perform a Replicated join using Spark.
Note: Need a program to justify this fits for Replication Join.

Semi-Join:
Given a huge dataset, do a semi-join using spark. Note that, 
with semi-join, one dataset needs to do Filter and projection to fit into the 
cache.
Note: Need a program to justify this fits for Semi-Join.


Composite Join:
Given a dataset whereby a dataset is still too big after 
filtering and cannot fit into the memory. Perform composite join on a 
pre-sorted and pre-partitioned data using spark.
Note: Need a program to justify this fits for composite Join.


Repartition join:
Join two datasets by performing Repartition join in spark.
Note: Need a program to justify this fits for repartition Join.





Thanks,
Aakash.


RE: HDPCD SPARK Certification Queries

2016-11-20 Thread Shreya Agarwal
Replication join = broadcast join. Look for that term on google. Many examples.

Semi join can be done on dataframes/dataset by passing “semi join” as the third 
parameter on the join/joinWith function.

Not sure about the other two.

Sent from my Windows 10 phone

From: Aakash Basu
Sent: Thursday, November 17, 2016 3:41 PM
To: user@spark.apache.org
Subject: HDPCD SPARK Certification Queries

Hi all,


I want to know more about this examination - 
http://hortonworks.com/training/certification/exam-objectives/#hdpcdspark


If anyone's there who appeared for the examination, can you kindly help?

1) What are the kind of questions that come,

2) Samples,

3) All the other details.

Thanks,
Aakash.


RE: Spark UI shows Jobs are processing, but the files are already written to S3

2016-11-16 Thread Shreya Agarwal
I think that is a bug. I have seen that a lot especially with long running jobs 
where Spark skips a lot of stages because it has pre-computed results. And some 
of these are never marked as completed, even though in reality they are. I 
figured this out because I was using the interactive shell (spark-shell) and 
the shell came up to a prompt indicating the job had finished even though there 
were a lot of Active jobs and tasks according to the UI. And my output is 
correct.

Is there a JIRA item tracking this?

From: Kuchekar [mailto:kuchekar.nil...@gmail.com]
Sent: Wednesday, November 16, 2016 10:00 AM
To: spark users 
Subject: Spark UI shows Jobs are processing, but the files are already written 
to S3

Hi,

 I am running a spark job, which saves the computed data (massive data) to 
S3. On  the Spark Ui I see the some jobs are active, but no activity in the 
logs. Also on S3 all the data has be written (verified each bucket --> it has 
_SUCCESS file)

Am I missing something?

Thanks.
Kuchekar, Nilesh


RE: AVRO File size when caching in-memory

2016-11-16 Thread Shreya Agarwal
Ah, yes. Nested schemas should be avoided if you want the best memory usage.

Sent from my Windows 10 phone

From: Prithish<mailto:prith...@gmail.com>
Sent: Wednesday, November 16, 2016 12:48 AM
To: Takeshi Yamamuro<mailto:linguin@gmail.com>
Cc: Shreya Agarwal<mailto:shrey...@microsoft.com>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: AVRO File size when caching in-memory

It's something like the schema shown below (with several additional 
levels/sublevels)

root
 |-- sentAt: long (nullable = true)
 |-- sharing: string (nullable = true)
 |-- receivedAt: long (nullable = true)
 |-- ip: string (nullable = true)
 |-- story: struct (nullable = true)
 ||-- super: string (nullable = true)
 ||-- lang: string (nullable = true)
 ||-- setting: string (nullable = true)
 ||-- myapp: struct (nullable = true)
 |||-- id: string (nullable = true)
 |||-- ver: string (nullable = true)
 |||-- build: string (nullable = true)
 ||-- comp: struct (nullable = true)
 |||-- notes: string (nullable = true)
 |||-- source: string (nullable = true)
 |||-- name: string (nullable = true)
 |||-- content: string (nullable = true)
 |||-- sub: string (nullable = true)
 ||-- loc: struct (nullable = true)
 |||-- city: string (nullable = true)
 |||-- country: string (nullable = true)
 |||-- lat: double (nullable = true)
 |||-- long: double (nullable = true)

On Wed, Nov 16, 2016 at 2:08 PM, Takeshi Yamamuro 
<linguin@gmail.com<mailto:linguin@gmail.com>> wrote:
Hi,

What's the schema interpreted by spark?
A compression logic of the spark caching depends on column types.

// maropu


On Wed, Nov 16, 2016 at 5:26 PM, Prithish 
<prith...@gmail.com<mailto:prith...@gmail.com>> wrote:
Thanks for your response.

I did some more tests and I am seeing that when I have a flatter structure for 
my AVRO, the cache memory use is close to the CSV. But, when I use few levels 
of nesting, the cache memory usage blows up. This is really critical for 
planning the cluster we will be using. To avoid using a larger cluster, looks 
like, we will have to consider keeping the structure flat as much as possible.

On Wed, Nov 16, 2016 at 1:18 PM, Shreya Agarwal 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote:
(Adding user@spark back to the discussion)

Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of scope for 
compression. On the other hand avro and parquet are already compressed and just 
store extra schema info, afaik. Avro and parquet are both going to make your 
data smaller, parquet through compressed columnar storage, and avro through its 
binary data format.

Next, talking about the 62kb becoming 1224kb. I actually do not see such a 
massive blow up. The avro you shared is 28kb on my system and becomes 53.7kb 
when cached in memory deserialized and 52.9kb when cached In memory serialized. 
Exact same numbers with parquet as well. This is expected behavior, if I am not 
wrong.

In fact, now that I think about it, even larger blow ups might be valid, since 
your data must have been deserialized from the compressed avro format, making 
it bigger. The order of magnitude of difference in size would depend on the 
type of data you have and how well it was compressable.

The purpose of these formats is to store data to persistent storage in a way 
that's faster to read from, not to reduce cache-memory usage.

Maybe others here have more info to share.

Regards,
Shreya

Sent from my Windows 10 phone

From: Prithish<mailto:prith...@gmail.com>
Sent: Tuesday, November 15, 2016 11:04 PM
To: Shreya Agarwal<mailto:shrey...@microsoft.com>
Subject: Re: AVRO File size when caching in-memory

I did another test and noting my observations here. These were done with the 
same data in avro and csv formats.

In AVRO, the file size on disk was 62kb and after caching, the in-memory size 
is 1224kb
In CSV, the file size on disk was 690kb and after caching, the in-memory size 
is 290kb

I'm guessing that the spark caching is not able to compress when the source is 
avro. Not sure if this is just my immature conclusion. Waiting to hear your 
observation.

On Wed, Nov 16, 2016 at 12:14 PM, Prithish 
<prith...@gmail.com<mailto:prith...@gmail.com>> wrote:
Thanks for your response.

I have attached the code (that I ran using the Spark-shell) as well as a sample 
avro file. After you run this code, the data is cached in memory and you can go 
to the "storage" tab on the Spark-ui (localhost:4040) and see the size it uses. 
In this example the size is small, but in my actual scenario, the source file 
size is 30GB and the in-memory size comes to around 800GB. I am trying to 
understand if this is expected when using avro or not.

On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal 
<shrey...@microsoft.com<mail

RE: what is the optimized way to combine multiple dataframes into one dataframe ?

2016-11-15 Thread Shreya Agarwal
If you are reading all these datasets from files in persistent storage, 
functions like sc.textFile can take folders/patterns as input and read all of 
the files matching into the same RDD. Then you can convert it to a dataframe.

When you say it is time consuming with union, how are you measuring that? Did 
you try having all of them in one DF in comparison to having them broken down? 
Are you seeing a non-linear slowdown in operations after union with linear 
increase in data size?
Sent from my Windows 10 phone

From: Devi P.V
Sent: Tuesday, November 15, 2016 11:06 PM
To: user @spark
Subject: what is the optimized way to combine multiple dataframes into one 
dataframe ?

Hi all,

I have 4 data frames with three columns,

client_id,product_id,interest

I want to combine these 4 dataframes into one dataframe.I used union like 
following

df1.union(df2).union(df3).union(df4)

But it is time consuming for bigdata.what is the optimized way for doing this 
using spark 2.0 & scala


Thanks


RE: AVRO File size when caching in-memory

2016-11-15 Thread Shreya Agarwal
(Adding user@spark back to the discussion)

Well, the CSV vs AVRO might be simpler to explain. CSV has a lot of scope for 
compression. On the other hand avro and parquet are already compressed and just 
store extra schema info, afaik. Avro and parquet are both going to make your 
data smaller, parquet through compressed columnar storage, and avro through its 
binary data format.

Next, talking about the 62kb becoming 1224kb. I actually do not see such a 
massive blow up. The avro you shared is 28kb on my system and becomes 53.7kb 
when cached in memory deserialized and 52.9kb when cached In memory serialized. 
Exact same numbers with parquet as well. This is expected behavior, if I am not 
wrong.

In fact, now that I think about it, even larger blow ups might be valid, since 
your data must have been deserialized from the compressed avro format, making 
it bigger. The order of magnitude of difference in size would depend on the 
type of data you have and how well it was compressable.

The purpose of these formats is to store data to persistent storage in a way 
that's faster to read from, not to reduce cache-memory usage.

Maybe others here have more info to share.

Regards,
Shreya

Sent from my Windows 10 phone

From: Prithish<mailto:prith...@gmail.com>
Sent: Tuesday, November 15, 2016 11:04 PM
To: Shreya Agarwal<mailto:shrey...@microsoft.com>
Subject: Re: AVRO File size when caching in-memory

I did another test and noting my observations here. These were done with the 
same data in avro and csv formats.

In AVRO, the file size on disk was 62kb and after caching, the in-memory size 
is 1224kb
In CSV, the file size on disk was 690kb and after caching, the in-memory size 
is 290kb

I'm guessing that the spark caching is not able to compress when the source is 
avro. Not sure if this is just my immature conclusion. Waiting to hear your 
observation.

On Wed, Nov 16, 2016 at 12:14 PM, Prithish 
<prith...@gmail.com<mailto:prith...@gmail.com>> wrote:
Thanks for your response.

I have attached the code (that I ran using the Spark-shell) as well as a sample 
avro file. After you run this code, the data is cached in memory and you can go 
to the "storage" tab on the Spark-ui (localhost:4040) and see the size it uses. 
In this example the size is small, but in my actual scenario, the source file 
size is 30GB and the in-memory size comes to around 800GB. I am trying to 
understand if this is expected when using avro or not.

On Wed, Nov 16, 2016 at 10:37 AM, Shreya Agarwal 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote:
I haven't used Avro ever. But if you can send over a quick sample code, I can 
run and see if I repro it and maybe debug.

From: Prithish [mailto:prith...@gmail.com<mailto:prith...@gmail.com>]
Sent: Tuesday, November 15, 2016 8:44 PM
To: J?rn Franke <jornfra...@gmail.com<mailto:jornfra...@gmail.com>>
Cc: User <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: AVRO File size when caching in-memory

Anyone?

On Tue, Nov 15, 2016 at 10:45 AM, Prithish 
<prith...@gmail.com<mailto:prith...@gmail.com>> wrote:
I am using 2.0.1 and databricks avro library 3.0.1. I am running this on the 
latest AWS EMR release.

On Mon, Nov 14, 2016 at 3:06 PM, J?rn Franke 
<jornfra...@gmail.com<mailto:jornfra...@gmail.com>> wrote:
spark version? Are you using tungsten?

> On 14 Nov 2016, at 10:05, Prithish 
> <prith...@gmail.com<mailto:prith...@gmail.com>> wrote:
>
> Can someone please explain why this happens?
>
> When I read a 600kb AVRO file and cache this in memory (using cacheTable), it 
> shows up as 11mb (storage tab in Spark UI). I have tried this with different 
> file sizes, and the size in-memory is always proportionate. I thought Spark 
> compresses when using cacheTable.






RE: Strongly Connected Components

2016-11-11 Thread Shreya Agarwal
Thanks for the detailed response ☺ I will try the things you mentioned!

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]
Sent: Friday, November 11, 2016 4:59 PM
To: Shreya Agarwal <shrey...@microsoft.com>
Cc: Felix Cheung <felixcheun...@hotmail.com>; user@spark.apache.org; Denny Lee 
<denny@microsoft.com>
Subject: Re: Strongly Connected Components

Hi Shreya,
GraphFrames just calls the GraphX strongly connected components code. 
(https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51)

For choosing the number of iterations: If the number of iterations is less than 
the diameter of the graph, you may get an incorrect result. But running for 
more iterations than that buys you nothing. The algorithm is basically to 
broadcast your ID to all your neighbors in the first round, and then broadcast 
the smallest ID that you have seen so far in the next rounds. So with only 1 
round you will get a wrong result unless each vertex is connected to the vertex 
with the lowest ID in that component. (Unlikely in a real graph.)

See 
https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
 for the actual implementation.

A better algorithm exists for this problem that only requires O(log(N)) 
iterations when N is the largest component diameter. (It is described in "A 
Model of Computation for MapReduce", 
http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms GraphX's 
implementation immensely. (See the last slide of 
http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
 The large advantage is due to the lower number of necessary iterations.

For why this is failing even with one iteration: I would first check your 
partitioning. Too many or too few partitions could equally cause the issue. If 
you are lucky, there is no overlap between the "too many" and "too few" domains 
:).

On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote:
Tried GraphFrames. Still faced the same – job died after a few hours . The 
errors I see (And I see tons of them) are –
(I ran with 3 times the partitions as well, which was 12 times number of 
executors , but still the same.)

-
ERROR NativeAzureFileSystem: Encountered Storage Exception for write on Blob : 
hdp/spark2-events/application_1478717432179_0021.inprogress Exception details: 
null Error Code : RequestBodyTooLarge

-

16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests 
outstanding when connection from /10.0.0.95:43301<http://10.0.0.95:43301> is 
closed
16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 
outstanding blocks after 5000 ms
16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty 
blocks out of 1500 blocks
16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.io.IOException: Connection from /10.0.0.95:43301<http://10.0.0.95:43301> 
closed

-

16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.lang.RuntimeException: java.io.FileNotFoundException: 
/mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index
 (No such file or directory)

-

org.apache.spark.SparkException: Exception thrown in awaitResult
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
at 
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at 
org.apache.spark.executor.Executor.org<http://org.apache.spark.executor.Executor.org>$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at 
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857

RE: Strongly Connected Components

2016-11-11 Thread Shreya Agarwal
.run(Executor.scala:547)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after 
[10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at 
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at 
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
... 13 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 
seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

From: Shreya Agarwal
Sent: Thursday, November 10, 2016 8:16 PM
To: 'Felix Cheung' <felixcheun...@hotmail.com>; user@spark.apache.org
Subject: RE: Strongly Connected Components

Yesterday's run died sometime during the night, without any errors. Today, I am 
running it using GraphFrames instead. It is still spawning new tasks, so there 
is progress.

From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Thursday, November 10, 2016 7:50 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>; Shreya Agarwal 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>>
Subject: Re: Strongly Connected Components

It is possible it is dead. Could you check the Spark UI to see if there is any 
progress?

_________
From: Shreya Agarwal <shrey...@microsoft.com<mailto:shrey...@microsoft.com>>
Sent: Thursday, November 10, 2016 12:45 AM
Subject: RE: Strongly Connected Components
To: <user@spark.apache.org<mailto:user@spark.apache.org>>


Bump. Anyone? Its been running for 10 hours now. No results.

From: Shreya Agarwal
Sent: Tuesday, November 8, 2016 9:05 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Strongly Connected Components

Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya



RE: Dataset API | Setting number of partitions during join/groupBy

2016-11-11 Thread Shreya Agarwal
Curious – why do you want to repartition? Is there a subsequent step which 
fails because the number of partitions is less? Or you want to do it for a perf 
gain?

Also, what were your initial Dataset partitions and how many did you have for 
the result of join?

From: Aniket Bhatnagar [mailto:aniket.bhatna...@gmail.com]
Sent: Friday, November 11, 2016 9:22 AM
To: user 
Subject: Dataset API | Setting number of partitions during join/groupBy

Hi

I can't seem to find a way to pass number of partitions while join 2 Datasets 
or doing a groupBy operation on the Dataset. There is an option of 
repartitioning the resultant Dataset but it's inefficient to repartition after 
the Dataset has been joined/grouped into default number of partitions. With RDD 
API, this was easy to do as the functions accepted a numPartitions parameter. 
The only way to do this seems to be 
sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, ) but 
this means that all join/groupBy operations going forward will have the same 
number of partitions.

Thanks,
Aniket


RE: Strongly Connected Components

2016-11-10 Thread Shreya Agarwal
Yesterday's run died sometime during the night, without any errors. Today, I am 
running it using GraphFrames instead. It is still spawning new tasks, so there 
is progress.

From: Felix Cheung [mailto:felixcheun...@hotmail.com]
Sent: Thursday, November 10, 2016 7:50 PM
To: user@spark.apache.org; Shreya Agarwal <shrey...@microsoft.com>
Subject: Re: Strongly Connected Components

It is possible it is dead. Could you check the Spark UI to see if there is any 
progress?

_
From: Shreya Agarwal <shrey...@microsoft.com<mailto:shrey...@microsoft.com>>
Sent: Thursday, November 10, 2016 12:45 AM
Subject: RE: Strongly Connected Components
To: <user@spark.apache.org<mailto:user@spark.apache.org>>



Bump. Anyone? Its been running for 10 hours now. No results.

From: Shreya Agarwal
Sent: Tuesday, November 8, 2016 9:05 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Strongly Connected Components

Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya



RE: Strongly Connected Components

2016-11-10 Thread Shreya Agarwal
Bump. Anyone? Its been running for 10 hours now. No results.

From: Shreya Agarwal
Sent: Tuesday, November 8, 2016 9:05 PM
To: user@spark.apache.org
Subject: Strongly Connected Components

Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya


RE: Re:RE: how to merge dataframe write output files

2016-11-10 Thread Shreya Agarwal
Your coalesce should technically work - One thing to check would be overhead 
memory. You should configure it as 10% of executor memory.  Also, you might 
need to increase maxResultSize. Also, the data looks fine for the cluster 
unless your join yields >6G worth of data. Few things to try -

  1.  Can you do a cache on both the DFs and try?
  2.  Can you do a cache on both, then use scala join on DFs instead of loading 
to sql ?
  3.  Can you try dataset instead of dataframe? (assuming you are on Spark 2.0)
  4.  If you still see errors, can you check YARN logs, or post here?

I am sorry I don't know the answer to this,  but pretty sure there should be a 
way to work with fragmented files too.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Thursday, November 10, 2016 12:20 AM
To: Shreya Agarwal <shrey...@microsoft.com>
Cc: user.spark <user@spark.apache.org>
Subject: Re:RE: how to merge dataframe write output files

thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file .
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?

the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }



At 2016-11-10 15:47:02, "Shreya Agarwal" 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote:

Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it'll make your write incredibly slow and also it'll take 
away all the speed of reading in the data from a parquet as there won't be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is - Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it'll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com<mailto:lk_sp...@163.com>]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-1

RE: how to merge dataframe write output files

2016-11-09 Thread Shreya Agarwal
Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it’ll make your write incredibly slow and also it’ll take 
away all the speed of reading in the data from a parquet as there won’t be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is – Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it’ll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark 
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc
more an more...
2016-11-10

lk_spark


Strongly Connected Components

2016-11-08 Thread Shreya Agarwal
Hi,

I am running this on a graph with >5B edges and >3B edges and have 2 questions -


  1.  What is the optimal number of iterations?
  2.  I am running it for 1 iteration right now on a beefy 100 node cluster, 
with 300 executors each having 30GB RAM and 5 cores. I have persisted the graph 
to MEMORY_AND_DISK. And it has been running for 3 hours already. Any ideas on 
how to speed this up?

Regards,
Shreya


RE: Anomalous Spark RDD persistence behavior

2016-11-07 Thread Shreya Agarwal
I don’t think this is correct. Unless you are serializing when caching to 
memory but not serializing when persisting to disk. Can you check?

Also, I have seen the behavior where if I have 100 GB in-memory cache and I use 
60 GB to persist something (MEMORY_AND_DISK). Then try to persist another RDD 
with MEMORY_AND_DISK option which is much greater than the remaining 40 GB 
(lets say 1 TB), my executors start getting killed at one point. During this 
period, the memory usage goes above 100GB and after some extra usage it fails. 
It seems like Spark is trying to cache this new RDD to memory and move the old 
one out to disk. But it is not able to move the old one out fast enough and 
crashes with OOM. Anyone seeing that?

From: Dave Jaffe [mailto:dja...@vmware.com]
Sent: Monday, November 7, 2016 2:07 PM
To: user@spark.apache.org
Subject: Anomalous Spark RDD persistence behavior

I’ve been studying Spark RDD persistence with spark-perf 
(https://github.com/databricks/spark-perf), especially when the dataset size 
starts to exceed available memory.

I’m running Spark 1.6.0 on YARN with CDH 5.7. I have 10 NodeManager nodes, each 
with 16 vcores and 32 GB of container memory. So I’m running 39 executors with 
4 cores and 8 GB each (6 GB spark.executor.memory and 2 GB 
spark.yarn.executor.memoryOverhead). I am using the default values for 
spark.memory.fraction and spark.memory.storageFraction so I end up with 3.1 GB 
available for caching RDDs, for a total of about 121 GB.

I’m running a single Random Forest test, with 500 features and up to 40 million 
examples, with 1 partition per core or 156 total partitions. The code (at line 
https://github.com/databricks/spark-perf/blob/master/mllib-tests/v1p5/src/main/scala/mllib/perf/MLAlgorithmTests.scala#L653)
 caches the input RDD immediately after creation. At 30M examples this fits 
into memory with all 156 partitions cached, with a total 113.4 GB in memory, or 
4 blocks of about 745 MB each per executor. So far so good.

At 40M examples, I expected about 3 partitions to fit in memory per executor, 
or 75% to be cached. However, I found only 3 partitions across the cluster were 
cached, or 2%, for a total size in memory of 2.9GB. Three of the executors had 
one block of 992 MB cached, with 2.1 GB free (enough for 2 more blocks). The 
other 36 held no blocks, with 3.1 GB free (enough for 3 blocks). Why this 
dramatic falloff?

Thinking this may improve if I changed the persistence to MEMORY_AND_DISK. 
Unfortunately now the executor memory was exceeded (“Container killed by YARN 
for exceeding memory limits. 8.9 GB of 8 GB physical memory used”) and the run 
ground to a halt. Why does persisting to disk take more memory than caching to 
memory?

Is this behavior expected as dataset size exceeds available memory?

Thanks in advance,

Dave Jaffe
Big Data Performance
VMware
dja...@vmware.com