Re: Long-Running Spark application doesn't clean old shuffle data correctly

2019-07-21 Thread Keith Chapman
Hi Alex,

Shuffle files in spark are deleted when the object holding a reference to
the shuffle file on disk goes out of scope (is garbage collected by the
JVM).  Could it be the case that you are keeping these objects alive?

Regards,
Keith.

http://keith-chapman.com


On Sun, Jul 21, 2019 at 12:19 AM Alex Landa  wrote:

> Thanks,
> I looked into these options, the cleaner periodic interval is set to 30
> min by default.
> The block option for shuffle -
> *spark.cleaner.referenceTracking.blocking.shuffle* - is set to false by
> default.
> What are the implications of setting it to true?
> Will it make the driver slower?
>
> Thanks,
> Alex
>
> On Sun, Jul 21, 2019 at 9:06 AM Prathmesh Ranaut Gmail <
> prathmesh.ran...@gmail.com> wrote:
>
>> This is the job of ContextCleaner. There are few a property that you can
>> tweak to see if that helps:
>> spark.cleaner.periodicGC.interval
>> spark.cleaner.referenceTracking
>> spark.cleaner.referenceTracking.blocking.shuffle
>>
>> Regards
>> Prathmesh Ranaut
>>
>> On Jul 21, 2019, at 11:31 AM, Alex Landa  wrote:
>>
>> Hi,
>>
>> We are running a long running Spark application ( which executes lots of
>> quick jobs using our scheduler ) on Spark stand-alone cluster 2.4.0.
>> We see that old shuffle files ( a week old for example ) are not deleted
>> during the execution of the application, which leads to out of disk space
>> errors on the executor.
>> If we re-deploy the application, the Spark cluster take care of the
>> cleaning
>> and deletes the old shuffle data (since we have
>> /-Dspark.worker.cleanup.enabled=true/ in the worker config).
>> I don't want to re-deploy our app every week or two, but to be able to
>> configure spark to clean old shuffle data (as it should).
>>
>> How can I configure Spark to delete old shuffle data during the life time
>> of
>> the application (not after)?
>>
>>
>> Thanks,
>> Alex
>>
>>


Re: Sorting tuples with byte key and byte value

2019-07-15 Thread Keith Chapman
Hi Supun,

A couple of things with regard to your question.

--executor-cores means the number of worker threads per VM. According to
your requirement this should be set to 8.

*repartitionAndSortWithinPartitions *is a RDD operation, RDD operations in
Spark are not performant both in terms of execution and memory. I would
rather use Dataframe sort operation if performance is key.

Regards,
Keith.

http://keith-chapman.com


On Mon, Jul 15, 2019 at 8:45 AM Supun Kamburugamuve <
supun.kamburugam...@gmail.com> wrote:

> Hi all,
>
> We are trying to measure the sorting performance of Spark. We have a 16
> node cluster with 48 cores and 256GB of ram in each machine and 10Gbps
> network.
>
> Let's say we are running with 128 parallel tasks and each partition
> generates about 1GB of data (total 128GB).
>
> We are using the method *repartitionAndSortWithinPartitions*
>
> A standalone cluster is used with the following configuration.
>
> SPARK_WORKER_CORES=1
> SPARK_WORKER_MEMORY=16G
> SPARK_WORKER_INSTANCES=8
>
> --executor-memory 16G --executor-cores 1 --num-executors 128
>
> I believe this sets 128 executors to run the job each having 16GB of
> memory and spread across 16 nodes with 8 threads in each node. This
> configuration runs very slow. The program doesn't use disks to read or
> write data (data generated in-memory and we don't write to file after
> sorting).
>
> It seems even though the data size is small, it uses disk for the shuffle.
> We are not sure our configurations are optimal to achieve the best
> performance.
>
> Best,
> Supun..
>
>


Re: Override jars in spark submit

2019-06-19 Thread Keith Chapman
Hi Naresh,

You could use "--conf spark.driver.extraClassPath=". Note
that the jar will not be shipped to the executors, if its a class that is
needed on the executors as well you should provide "--conf
spark.executor.extraClassPath=". Note that if you do
provide executor extraclasspath the jar file needs to be present on all the
executors.

Regards,
Keith.

http://keith-chapman.com


On Wed, Jun 19, 2019 at 8:57 PM naresh Goud 
wrote:

> Hello All,
>
> How can we override jars in spark submit?
> We have hive-exec-spark jar which is available as part of default spark
> cluster jars.
> We wanted to override above mentioned jar in spark submit with latest
> version jar.
> How do we do that ?
>
>
> Thank you,
> Naresh
> --
> Thanks,
> Naresh
> www.linkedin.com/in/naresh-dulam
> http://hadoopandspark.blogspot.com/
>
>


Re: [pyspark 2.3] count followed by write on dataframe

2019-05-20 Thread Keith Chapman
Yes that is correct, that would cause computation twice. If you want the
computation to happen only once you can cache the dataframe and call count
and write on the cached dataframe.

Regards,
Keith.

http://keith-chapman.com


On Mon, May 20, 2019 at 6:43 PM Rishi Shah  wrote:

> Hi All,
>
> Just wanted to confirm my understanding around actions on dataframe. If
> dataframe is not persisted at any point, & count() is called on a dataframe
> followed by write action --> this would trigger dataframe computation twice
> (which could be the performance hit for a larger dataframe).. Could anyone
> please help confirm?
>
> --
> Regards,
>
> Rishi Shah
>


RE: how to use cluster sparkSession like localSession

2018-11-04 Thread Sun, Keith
Hello,

I think you can try with  below , the reason is only yarn-cllient mode is 
supported for your scenario.

master("yarn-client")



Thanks very much.
Keith
From: 张万新 
Sent: Thursday, November 1, 2018 11:36 PM
To: 崔苗(数据与人工智能产品开发部) <0049003...@znv.com>
Cc: user 
Subject: Re: how to use cluster sparkSession like localSession

I think you should investigate apache zeppelin and livy
崔苗(数据与人工智能产品开发部) <0049003...@znv.com<mailto:0049003...@znv.com>>于2018年11月2日 
周五11:01写道:

Hi,
we want to execute spark code with out submit application.jar,like this code:

public static void main(String args[]) throws Exception{
SparkSession spark = SparkSession
.builder()
.master("local[*]")
.appName("spark test")
.getOrCreate();

Dataset testData = 
spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv");
testData.printSchema();
testData.show();
}

the above code can work well with idea , do not need to generate jar file and 
submit , but if we replace master("local[*]") with master("yarn") , it can't 
work , so is there a way to use cluster sparkSession like local sparkSession ?  
we need to dynamically execute spark code in web server according to the 
different request ,  such as filter request will call dataset.filter() , so 
there is no application.jar to submit .

[https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png]

0049003208

0049003...@znv.com<mailto:0049003...@znv.com>

签名由 
网易邮箱大师<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fmail.163.com%2Fdashi%2Fdlpro.html%3Ffrom%3Dmail81=02%7C01%7Caisun%40ebay.com%7C08e6e5175c0e4177e9a608d6408d94a2%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636767374160940114=HfnYgWKXOUCodtDZGPFQHpyVcY8Oi707rihUe8v24cQ%3D=0>
 定制
- To 
unsubscribe e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>


Pyspark error when converting string to timestamp in map function

2018-08-17 Thread Keith Chapman
Hi all,

I'm trying to create a dataframe enforcing a schema so that I can write it
to a parquet file. The schema has timestamps and I get an error with
pyspark. The following is a snippet of code that exhibits the problem,

df = sqlctx.range(1000)
schema = StructType([StructField('a', TimestampType(), True)])
df1 = sqlctx.createDataFrame(df.rdd.map(row_gen_func), schema)

row_gen_func is a function that retruns timestamp strings of the form
"2018-03-21 11:09:44"

When I compile this with Spark 2.2 I get the following error,

raise TypeError("%s can not accept object %r in type %s" % (dataType, obj,
type(obj)))
TypeError: TimestampType can not accept object '2018-03-21 08:06:17' in
type 

Regards,
Keith.

http://keith-chapman.com


Re: GC- Yarn vs Standalone K8

2018-06-11 Thread Keith Chapman
Spark on EMR is configured to use CMS GC, specifically following flags,

spark.executor.extraJavaOptions  -verbose:gc -XX:+PrintGCDetails
-XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70
-XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'


Regards,
Keith.

http://keith-chapman.com

On Mon, Jun 11, 2018 at 8:22 PM, ankit jain  wrote:

> Hi,
> Does anybody know if Yarn uses a different Garbage Collector from Spark
> standalone?
>
> We migrated our application recently from EMR to K8(not using native spark
> on k8 yet) and see quite a bit of performance degradation.
>
> Diving further it seems garbage collection is running too often, up-to 50%
> of task time even with small amount of data - PFA Spark UI screenshot.
>
> I have updated GC to G1GC and it has helped a bit - GC time have come down
> from 50-30%, still too high though.
>
> Also enabled -verbose:gc, so will be much more metrics to play with but
> any pointers meanwhile will be appreciated.
>
>
> --
> Thanks & Regards,
> Ankit.
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-26 Thread Keith Chapman
Hi Michael,

sorry for the late reply. I guess you may have to set it through the hdfs
core-site.xml file. The property you need to set is "hadoop.tmp.dir" which
defaults to "/tmp/hadoop-${user.name}"

Regards,
Keith.

http://keith-chapman.com

On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma <mshte...@gmail.com> wrote:

> Hi Keith,
>
> Thank you for the idea!
> I have tried it, so now the executor command is looking in the following
> way :
>
> /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m
> '-Djava.io.tmpdir=my_prefered_path'
> -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/
> msh/appcache/application_1521110306769_0041/container_
> 1521110306769_0041_01_04/tmp
>
> JVM is using the second Djava.io.tmpdir parameter and writing
> everything to the same directory as before.
>
> Best,
> Michael
> Sincerely,
> Michael Shtelma
>
>
> On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
> > Can you try setting spark.executor.extraJavaOptions to have
> > -Djava.io.tmpdir=someValue
> >
> > Regards,
> > Keith.
> >
> > http://keith-chapman.com
> >
> > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <mshte...@gmail.com>
> > wrote:
> >>
> >> Hi Keith,
> >>
> >> Thank you for your answer!
> >> I have done this, and it is working for spark driver.
> >> I would like to make something like this for the executors as well, so
> >> that the setting will be used on all the nodes, where I have executors
> >> running.
> >>
> >> Best,
> >> Michael
> >>
> >>
> >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman <keithgchap...@gmail.com
> >
> >> wrote:
> >> > Hi Michael,
> >> >
> >> > You could either set spark.local.dir through spark conf or
> >> > java.io.tmpdir
> >> > system property.
> >> >
> >> > Regards,
> >> > Keith.
> >> >
> >> > http://keith-chapman.com
> >> >
> >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com>
> >> > wrote:
> >> >>
> >> >> Hi everybody,
> >> >>
> >> >> I am running spark job on yarn, and my problem is that the blockmgr-*
> >> >> folders are being created under
> >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
> >> >> The size of this folder can grow to a significant size and does not
> >> >> really fit into /tmp file system for one job, which makes a real
> >> >> problem for my installation.
> >> >> I have redefined hadoop.tmp.dir in core-site.xml and
> >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> >> >> location and expected that the block manager will create the files
> >> >> there and not under /tmp, but this is not the case. The files are
> >> >> created under /tmp.
> >> >>
> >> >> I am wondering if there is a way to make spark not use /tmp at all
> and
> >> >> configure it to create all the files somewhere else ?
> >> >>
> >> >> Any assistance would be greatly appreciated!
> >> >>
> >> >> Best,
> >> >> Michael
> >> >>
> >> >> 
> -
> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >> >>
> >> >
> >
> >
>


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-19 Thread Keith Chapman
Can you try setting spark.executor.extraJavaOptions to have -D
java.io.tmpdir=someValue

Regards,
Keith.

http://keith-chapman.com

On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma <mshte...@gmail.com>
wrote:

> Hi Keith,
>
> Thank you for your answer!
> I have done this, and it is working for spark driver.
> I would like to make something like this for the executors as well, so
> that the setting will be used on all the nodes, where I have executors
> running.
>
> Best,
> Michael
>
>
> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
> > Hi Michael,
> >
> > You could either set spark.local.dir through spark conf or java.io.tmpdir
> > system property.
> >
> > Regards,
> > Keith.
> >
> > http://keith-chapman.com
> >
> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com>
> wrote:
> >>
> >> Hi everybody,
> >>
> >> I am running spark job on yarn, and my problem is that the blockmgr-*
> >> folders are being created under
> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
> >> The size of this folder can grow to a significant size and does not
> >> really fit into /tmp file system for one job, which makes a real
> >> problem for my installation.
> >> I have redefined hadoop.tmp.dir in core-site.xml and
> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> >> location and expected that the block manager will create the files
> >> there and not under /tmp, but this is not the case. The files are
> >> created under /tmp.
> >>
> >> I am wondering if there is a way to make spark not use /tmp at all and
> >> configure it to create all the files somewhere else ?
> >>
> >> Any assistance would be greatly appreciated!
> >>
> >> Best,
> >> Michael
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>


Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder

2018-03-19 Thread Keith Chapman
Hi Michael,

You could either set spark.local.dir through spark conf or java.io.tmpdir
system property.

Regards,
Keith.

http://keith-chapman.com

On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma <mshte...@gmail.com> wrote:

> Hi everybody,
>
> I am running spark job on yarn, and my problem is that the blockmgr-*
> folders are being created under
> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/*
> The size of this folder can grow to a significant size and does not
> really fit into /tmp file system for one job, which makes a real
> problem for my installation.
> I have redefined hadoop.tmp.dir in core-site.xml and
> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other
> location and expected that the block manager will create the files
> there and not under /tmp, but this is not the case. The files are
> created under /tmp.
>
> I am wondering if there is a way to make spark not use /tmp at all and
> configure it to create all the files somewhere else ?
>
> Any assistance would be greatly appreciated!
>
> Best,
> Michael
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Can I get my custom spark strategy to run last?

2018-03-01 Thread Keith Chapman
Hi,

I'd like to write a custom Spark strategy that runs after all the existing
Spark strategies are run. Looking through the Spark code it seems like the
custom strategies are prepended to the list of strategies in Spark. Is
there a way I could get it to run last?

Regards,
Keith.

http://keith-chapman.com


Re: Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
My issue is that there is not enough pressure on GC, hence GC is not
kicking in fast enough to delete the shuffle files of previous iterations.

Regards,
Keith.

http://keith-chapman.com

On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud <nareshgoud.du...@gmail.com>
wrote:

> It would be very difficult to tell without knowing what is your
> application code doing, what kind of transformation/actions performing.
> From my previous experience tuning application code which avoids
> unnecessary objects reduce pressure on GC.
>
>
> On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I'm benchmarking a spark application by running it for multiple
>> iterations, its a benchmark thats heavy on shuffle and I run it on a local
>> machine with a very large hear (~200GB). The system has a SSD. When running
>> for 3 to 4 iterations I get into a situation that I run out of disk space
>> on the /tmp directory. On further investigation I was able to figure out
>> that the reason for this is that the shuffle files are still around,
>> because I have a very large hear GC has not happen and hence the shuffle
>> files are not deleted. I was able to confirm this by lowering the heap size
>> and I see GC kicking in more often and the size of /tmp stays under
>> control. Is there any way I could configure spark to handle this issue?
>>
>> One option that I have is to have GC run more often by
>> setting spark.cleaner.periodicGC.interval to a much lower value. Is
>> there a cleaner solution?
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>
>


Spark not releasing shuffle files in time (with very large heap)

2018-02-22 Thread Keith Chapman
Hi,

I'm benchmarking a spark application by running it for multiple iterations,
its a benchmark thats heavy on shuffle and I run it on a local machine with
a very large hear (~200GB). The system has a SSD. When running for 3 to 4
iterations I get into a situation that I run out of disk space on the /tmp
directory. On further investigation I was able to figure out that the
reason for this is that the shuffle files are still around, because I have
a very large hear GC has not happen and hence the shuffle files are not
deleted. I was able to confirm this by lowering the heap size and I see GC
kicking in more often and the size of /tmp stays under control. Is there
any way I could configure spark to handle this issue?

One option that I have is to have GC run more often by
setting spark.cleaner.periodicGC.interval to a much lower value. Is there a
cleaner solution?

Regards,
Keith.

http://keith-chapman.com


Re: update LD_LIBRARY_PATH when running apache job in a YARN cluster

2018-01-17 Thread Keith Chapman
Hi Manuel,

You could use the following to add a path to the library search path,
--conf spark.driver.extraLibraryPath=PathToLibFolder
--conf spark.executor.extraLibraryPath=PathToLibFolder

Thanks,
Keith.

Regards,
Keith.

http://keith-chapman.com

On Wed, Jan 17, 2018 at 5:39 PM, Manuel Sopena Ballesteros <
manuel...@garvan.org.au> wrote:

> Dear Spark community,
>
>
>
> I have a spark running in a yarn cluster and I am getting some error when
> trying to run my python application.
>
>
>
> /home/mansop/virtenv/bin/python2.7: error while loading shared libraries:
> libpython2.7.so.1.0: cannot open shared object file: No such file or
> directory
>
>
>
> Is there a way to specify the LD_LIBRARY_PATH in the spark-submit command
> or in the config file?
>
>
>
>
>
> *Manuel Sopena Ballesteros *| Big data Engineer
> *Garvan Institute of Medical Research *
> The Kinghorn Cancer Centre, 370 Victoria Street, Darlinghurst, NSW 2010
> <https://maps.google.com/?q=370+Victoria+Street,+Darlinghurst,+NSW+2010=gmail=g>
> *T:* + 61 (0)2 9355 5760 <+61%202%209355%205760> | *F:* +61 (0)2 9295 8507
> <+61%202%209295%208507> | *E:* manuel...@garvan.org.au
>
>
> NOTICE
> Please consider the environment before printing this email. This message
> and any attachments are intended for the addressee named and may contain
> legally privileged/confidential/copyright information. If you are not the
> intended recipient, you should not read, use, disclose, copy or distribute
> this communication. If you have received this message in error please
> notify us at once by return email and then delete both messages. We accept
> no liability for the distribution of viruses or similar in electronic
> communications. This notice should not be removed.
>


How to find the temporary views' DDL

2017-10-01 Thread Sun, Keith
Hello,

Is there a way to find the DDL of the “temporary” view created in current 
session with spark sql:

For example :
create or replace temporary view
tmp_v as
select
c1 from table table_x;

“Show create table “ does not work for this case as it is not a table .
“Describe” could  show the columns while not the ddl.


Thanks very much.
Keith

From: Anastasios Zouzias [mailto:zouz...@gmail.com]
Sent: Sunday, October 1, 2017 3:05 PM
To: Kanagha Kumar <kpra...@salesforce.com>
Cc: user @spark <user@spark.apache.org>
Subject: Re: Error - Spark reading from HDFS via dataframes - Java

Hi,

Set the inferschema option to true in spark-csv. you may also want to set the 
mode option. See readme below

https://github.com/databricks/spark-csv/blob/master/README.md<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fdatabricks%2Fspark-csv%2Fblob%2Fmaster%2FREADME.md=02%7C01%7Caisun%40ebay.com%7C06202a50b81d4fc9b8bb08d5089ad6cf%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636424383471862502=7ksnhv3SpxslH6w%2BauGRw9qnDmu7aWg8tagUwWdFBd8%3D=0>

Best,
Anastasios

Am 01.10.2017 07:58 schrieb "Kanagha Kumar" 
<kpra...@salesforce.com<mailto:kpra...@salesforce.com>>:
Hi,

I'm trying to read data from HDFS in spark as dataframes. Printing the schema, 
I see all columns are being read as strings. I'm converting it to RDDs and 
creating another dataframe by passing in the correct schema ( how the rows 
should be interpreted finally).

I'm getting the following error:

Caused by: java.lang.RuntimeException: java.lang.String is not a valid external 
type for schema of bigint


Spark read API:

Dataset hdfs_dataset = new SQLContext(spark).read().option("header", 
"false").csv("hdfs:/inputpath/*");

Dataset ds = new 
SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(), conversionSchema);
This is the schema to be converted to:
StructType(StructField(COL1,StringType,true),
StructField(COL2,StringType,true),
StructField(COL3,LongType,true),
StructField(COL4,StringType,true),
StructField(COL5,StringType,true),
StructField(COL6,LongType,true))

This is the original schema obtained once read API was invoked
StructType(StructField(_c1,StringType,true),
StructField(_c2,StringType,true),
StructField(_c3,StringType,true),
StructField(_c4,StringType,true),
StructField(_c5,StringType,true),
StructField(_c6,StringType,true))

My interpretation is even when a JavaRDD is cast to dataframe by passing in the 
new schema, values are not getting type casted.
This is occurring because the above read API reads data as string types from 
HDFS.

How can I  convert an RDD to dataframe by passing in the correct schema once it 
is read?
How can the values by type cast correctly during this RDD to dataframe 
conversion?

Or how can I read data from HDFS with an input schema in java?
Any suggestions are helpful. Thanks!




RE: A bug in spark or hadoop RPC with kerberos authentication?

2017-08-23 Thread Sun, Keith
Thanks for the reply, I filled an issue in JIRA 
https://issues.apache.org/jira/browse/SPARK-21819

I submitted the job from Java API, not by the spark-submit command line as we 
want to make spark processing as a service .

Configuration hc = new  Configuration(false);
String yarnxml=String.format("%s/%s", 
ConfigLocation,"yarn-site.xml");
String corexml=String.format("%s/%s", 
ConfigLocation,"core-site.xml");
String hdfsxml=String.format("%s/%s", 
ConfigLocation,"hdfs-site.xml");
String hivexml=String.format("%s/%s", 
ConfigLocation,"hive-site.xml");

hc.addResource(yarnxml);
hc.addResource(corexml);
hc.addResource(hdfsxml);
hc.addResource(hivexml);

//manually set all the Hadoop config in sparkconf
SparkConf sc = new SparkConf(true);
hc.forEach(entry-> {
 if(entry.getKey().startsWith("hive")) {
   sc.set(entry.getKey(), 
entry.getValue());
 }else {
   
sc.set("spark.hadoop."+entry.getKey(), entry.getValue());
 }
   });

  UserGroupInformation.setConfiguration(hc);
  UserGroupInformation.loginUserFromKeytab(Principal, Keytab);

SparkSession sparkSessesion= SparkSession
 .builder()
 .master("yarn-client") 
//"yarn-client", "local"
 .config(sc)
 .appName(SparkEAZDebug.class.getName())
     .enableHiveSupport()
     .getOrCreate();


Thanks very much.
Keith

From: 周康 [mailto:zhoukang199...@gmail.com]
Sent: 2017年8月22日 20:22
To: Sun, Keith <ai...@ebay.com>
Cc: user@spark.apache.org
Subject: Re: A bug in spark or hadoop RPC with kerberos authentication?

you can checkout Hadoop**credential class in  spark yarn。During spark submit,it 
will use config on the classpath.
I wonder how do you reference your own config?


RE: A bug in spark or hadoop RPC with kerberos authentication?

2017-08-23 Thread Sun, Keith
Finally find the root cause and raise a bug issue in 
https://issues.apache.org/jira/browse/SPARK-21819



Thanks very much.
Keith

From: Sun, Keith
Sent: 2017年8月22日 8:48
To: user@spark.apache.org
Subject: A bug in spark or hadoop RPC with kerberos authentication?

Hello ,

I met this very weird issue, while easy to reproduce, and stuck me for more 
than 1 day .I suspect this may be an issue/bug related to the class loader.
Can you help confirm the root cause ?

I want to specify a customized Hadoop configuration set instead of those on the 
class path(we have a few hadoop clusters and all have Kerberos security and I 
want to support different configuration).
Code/error like below.


The work around I found is to place a core-site.xml on the class path with 
below 2 properties will work.
By checking  the rpc code under org.apache.hadoop.ipc.RPC, I suspect the RPC 
code may not see the UGI class in the same classloader.
So UGI is initialized with default value on the classpth which is simple 
authentication.

core-site.xml with the security setup on the classpath:


hadoop.security.authentication
 kerberos


hadoop.security.authorization
true




error--
2673 [main] DEBUG 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil  - 
DataTransferProtocol using SaslPropertiesResolver, configured QOP 
dfs.data.transfer.protection = privacy, configured class 
dfs.data.transfer.saslproperties.resolver.class = class 
org.apache.hadoop.security.WhitelistBasedResolver
2696 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service: 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state INITED
2744 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - 
PrivilegedAction as:x@xxxCOM (auth:KERBEROS) 
from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:136) //
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC  - Creating YarnRPC for 
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC  - Creating a 
HadoopYarnProtoRpc proxy for protocol interface 
org.apache.hadoop.yarn.api.ApplicationClientProtocol
2801 [main] DEBUG org.apache.hadoop.ipc.Client  - getting client out of cache: 
org.apache.hadoop.ipc.Client@748fe51d<mailto:org.apache.hadoop.ipc.Client@748fe51d>
2981 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started
3004 [main] DEBUG org.apache.hadoop.ipc.Client  - The ping interval is 6 ms.
3005 [main] DEBUG org.apache.hadoop.ipc.Client  - Connecting to 
yarn-rm-1/x:8032
3019 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx: starting, having 
connections 1
3020 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client  - 
IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx 
sending #0
3025 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx got value #-1
3026 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - closing ipc connection to 
yarn-rm-1/x:8032: SIMPLE authentication is not enabled.  Available:[TOKEN, 
KERBEROS]
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
at 
org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1131)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)
---code---

  Configuration hc = new  Configuration(false);

  hc.addResource("myconf /yarn-site.xml");
  hc.addResource("myconf/core-site.xml");
  hc.addResource("myconf/hdfs-site.xml");
  hc.addResource("myconf/hive-site.xml");

  SparkConf sc = new SparkConf(true);
  // add config in spark conf as no xml in the classpath except those 
“default.xml” from Hadoop jars.
  hc.forEach(entry-> {
if(entry.getKey().startsWith("hive")) {
sc.set(entry.getKey(), entry.getValue());
}else {
sc.set("spark.hadoop."+entry.getKey(), entry.getValue());
}
 });

   UserGroupInformation.setConfiguration(hc);
   UserGroupInformation.loginUserFromKeytab(Principal, Keytab);

  System.out.println("spark-conf##");
  System.out.println(sc.toDebugString());


  SparkSession sparkSessesion= SparkSession
.builder()
.master("yarn-client") //"

A bug in spark or hadoop RPC with kerberos authentication?

2017-08-22 Thread Sun, Keith
Hello ,

I met this very weird issue, while easy to reproduce, and stuck me for more 
than 1 day .I suspect this may be an issue/bug related to the class loader.
Can you help confirm the root cause ?

I want to specify a customized Hadoop configuration set instead of those on the 
class path(we have a few hadoop clusters and all have Kerberos security and I 
want to support different configuration).
Code/error like below.


The work around I found is to place a core-site.xml on the class path with 
below 2 properties will work.
By checking  the rpc code under org.apache.hadoop.ipc.RPC, I suspect the RPC 
code may not see the UGI class in the same classloader.
So UGI is initialized with default value on the classpth which is simple 
authentication.

core-site.xml with the security setup on the classpath:


hadoop.security.authentication
 kerberos


hadoop.security.authorization
true




error--
2673 [main] DEBUG 
org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil  - 
DataTransferProtocol using SaslPropertiesResolver, configured QOP 
dfs.data.transfer.protection = privacy, configured class 
dfs.data.transfer.saslproperties.resolver.class = class 
org.apache.hadoop.security.WhitelistBasedResolver
2696 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service: 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state INITED
2744 [main] DEBUG org.apache.hadoop.security.UserGroupInformation  - 
PrivilegedAction as:x@xxxCOM (auth:KERBEROS) 
from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:136) //
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC  - Creating YarnRPC for 
org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC
2746 [main] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC  - Creating a 
HadoopYarnProtoRpc proxy for protocol interface 
org.apache.hadoop.yarn.api.ApplicationClientProtocol
2801 [main] DEBUG org.apache.hadoop.ipc.Client  - getting client out of cache: 
org.apache.hadoop.ipc.Client@748fe51d
2981 [main] DEBUG org.apache.hadoop.service.AbstractService  - Service 
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started
3004 [main] DEBUG org.apache.hadoop.ipc.Client  - The ping interval is 6 ms.
3005 [main] DEBUG org.apache.hadoop.ipc.Client  - Connecting to 
yarn-rm-1/x:8032
3019 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx: starting, having 
connections 1
3020 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client  - 
IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx 
sending #0
3025 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - IPC Client (2012095985) 
connection to yarn-rm-1/x:8032 from x@xx got value #-1
3026 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from 
x@xx] DEBUG org.apache.hadoop.ipc.Client  - closing ipc connection to 
yarn-rm-1/x:8032: SIMPLE authentication is not enabled.  Available:[TOKEN, 
KERBEROS]
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
 SIMPLE authentication is not enabled.  Available:[TOKEN, KERBEROS]
at 
org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1131)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979)
---code---

  Configuration hc = new  Configuration(false);

  hc.addResource("myconf /yarn-site.xml");
  hc.addResource("myconf/core-site.xml");
  hc.addResource("myconf/hdfs-site.xml");
  hc.addResource("myconf/hive-site.xml");

  SparkConf sc = new SparkConf(true);
  // add config in spark conf as no xml in the classpath except those 
“default.xml” from Hadoop jars.
  hc.forEach(entry-> {
if(entry.getKey().startsWith("hive")) {
sc.set(entry.getKey(), entry.getValue());
}else {
sc.set("spark.hadoop."+entry.getKey(), entry.getValue());
}
 });

   UserGroupInformation.setConfiguration(hc);
   UserGroupInformation.loginUserFromKeytab(Principal, Keytab);

  System.out.println("spark-conf##");
  System.out.println(sc.toDebugString());


  SparkSession sparkSessesion= SparkSession
.builder()
.master("yarn-client") //"yarn-client", "local"
.config(sc)
.appName(SparkEAZDebug.class.getName())
.enableHiveSupport()
.getOrCreate();

Thanks very much.
Keith



Re: What are some disadvantages of issuing a raw sql query to spark?

2017-07-25 Thread Keith Chapman
Here is an example of a window lead function,

select *, lead(someColumn1) over ( partition by someColumn2 order by
someColumn13 asc nulls first) as someName  from someTable

Regards,
Keith.

http://keith-chapman.com

On Tue, Jul 25, 2017 at 9:15 AM, kant kodali <kanth...@gmail.com> wrote:

> How do I Specify windowInterval and slideInteval using raw sql string?
>
> On Tue, Jul 25, 2017 at 8:52 AM, Keith Chapman <keithgchap...@gmail.com>
> wrote:
>
>> You could issue a raw sql query to spark, there is no particular
>> advantage or disadvantage of doing so. Spark would build a logical plan
>> from the raw sql (or DSL) and optimize on that. Ideally you would end up
>> with the same physical plan, irrespective of it been written in raw sql /
>> DSL.
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>> On Tue, Jul 25, 2017 at 12:50 AM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> HI All,
>>>
>>> I just want to run some spark structured streaming Job similar to this
>>>
>>> DS.filter(col("name").equalTo("john"))
>>> .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 
>>> hours"), df1.col("hourlyPay"))
>>> .agg(sum("hourlyPay").as("total"));
>>>
>>>
>>> I am wondering if I can express the above query in raw sql string?
>>>
>>> If so how would that look like and what are some of the disadvantages of 
>>> using raw sql query vs spark DSL?
>>>
>>>
>>> Thanks!
>>>
>>>
>>
>


Re: What are some disadvantages of issuing a raw sql query to spark?

2017-07-25 Thread Keith Chapman
You could issue a raw sql query to spark, there is no particular advantage
or disadvantage of doing so. Spark would build a logical plan from the raw
sql (or DSL) and optimize on that. Ideally you would end up with the same
physical plan, irrespective of it been written in raw sql / DSL.

Regards,
Keith.

http://keith-chapman.com

On Tue, Jul 25, 2017 at 12:50 AM, kant kodali <kanth...@gmail.com> wrote:

> HI All,
>
> I just want to run some spark structured streaming Job similar to this
>
> DS.filter(col("name").equalTo("john"))
> .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 
> hours"), df1.col("hourlyPay"))
> .agg(sum("hourlyPay").as("total"));
>
>
> I am wondering if I can express the above query in raw sql string?
>
> If so how would that look like and what are some of the disadvantages of 
> using raw sql query vs spark DSL?
>
>
> Thanks!
>
>


Re: Get full RDD lineage for a spark job

2017-07-21 Thread Keith Chapman
You could also enable it with --conf spark.logLineage=true if you do not
want to change any code.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jul 21, 2017 at 7:57 PM, Keith Chapman <keithgchap...@gmail.com>
wrote:

> Hi Ron,
>
> You can try using the toDebugString method on the RDD, this will print
> the RDD lineage.
>
> Regards,
> Keith.
>
> http://keith-chapman.com
>
> On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez <
> zlgonza...@yahoo.com.invalid> wrote:
>
>> Hi,
>>   Can someone point me to a test case or share sample code that is able
>> to extract the RDD graph from a Spark job anywhere during its lifecycle? I
>> understand that Spark has UI that can show the graph of the execution so
>> I'm hoping that is using some API somewhere that I could use.
>>   I know RDD is the actual execution graph, so if there is also a more
>> logical abstraction API closer to calls like map, filter, aggregate, etc.,
>> that would even be better.
>>   Appreciate any help...
>>
>> Thanks,
>> Ron
>>
>
>


Re: Get full RDD lineage for a spark job

2017-07-21 Thread Keith Chapman
Hi Ron,

You can try using the toDebugString method on the RDD, this will print the
RDD lineage.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez <zlgonza...@yahoo.com.invalid
> wrote:

> Hi,
>   Can someone point me to a test case or share sample code that is able to
> extract the RDD graph from a Spark job anywhere during its lifecycle? I
> understand that Spark has UI that can show the graph of the execution so
> I'm hoping that is using some API somewhere that I could use.
>   I know RDD is the actual execution graph, so if there is also a more
> logical abstraction API closer to calls like map, filter, aggregate, etc.,
> that would even be better.
>   Appreciate any help...
>
> Thanks,
> Ron
>


Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-24 Thread Keith Chapman
Hi Nguyen,

This looks promising and seems like I could achieve it using cluster by.
Thanks for the pointer.

Regards,
Keith.

http://keith-chapman.com

On Sat, Jun 24, 2017 at 5:27 AM, nguyen duc Tuan <newvalu...@gmail.com>
wrote:

> Hi Chapman,
> You can use "cluster by" to do what you want.
> https://deepsense.io/optimize-spark-with-distribute-by-and-cluster-by/
>
> 2017-06-24 17:48 GMT+07:00 Saliya Ekanayake <esal...@gmail.com>:
>
>> I haven't worked with datasets but would this help
>> https://stackoverflow.com/questions/37513667/how-to-cre
>> ate-a-spark-dataset-from-an-rdd?
>>
>> On Jun 23, 2017 5:43 PM, "Keith Chapman" <keithgchap...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have code that does the following using RDDs,
>>>
>>> val outputPartitionCount = 300
>>> val part = new MyOwnPartitioner(outputPartitionCount)
>>> val finalRdd = myRdd.repartitionAndSortWithinPartitions(part)
>>>
>>> where myRdd is correctly formed as key, value pairs. I am looking
>>> convert this to use Dataset/Dataframe instead of RDDs, so my question is:
>>>
>>> Is there custom partitioning of Dataset/Dataframe implemented in Spark?
>>> Can I accomplish the partial sort using mapPartitions on the resulting
>>> partitioned Dataset/Dataframe?
>>>
>>> Any thoughts?
>>>
>>> Regards,
>>> Keith.
>>>
>>> http://keith-chapman.com
>>>
>>
>


Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-24 Thread Keith Chapman
Thanks for the pointer Saliya, I'm looking got an equivalent api in
dataset/dataframe for repartitionAndSortWithinPartitions, I've already
converted most of the RDD's to Dataframes.

Regards,
Keith.

http://keith-chapman.com

On Sat, Jun 24, 2017 at 3:48 AM, Saliya Ekanayake <esal...@gmail.com> wrote:

> I haven't worked with datasets but would this help https://stackoverflow.
> com/questions/37513667/how-to-create-a-spark-dataset-from-an-rdd?
>
> On Jun 23, 2017 5:43 PM, "Keith Chapman" <keithgchap...@gmail.com> wrote:
>
>> Hi,
>>
>> I have code that does the following using RDDs,
>>
>> val outputPartitionCount = 300
>> val part = new MyOwnPartitioner(outputPartitionCount)
>> val finalRdd = myRdd.repartitionAndSortWithinPartitions(part)
>>
>> where myRdd is correctly formed as key, value pairs. I am looking convert
>> this to use Dataset/Dataframe instead of RDDs, so my question is:
>>
>> Is there custom partitioning of Dataset/Dataframe implemented in Spark?
>> Can I accomplish the partial sort using mapPartitions on the resulting
>> partitioned Dataset/Dataframe?
>>
>> Any thoughts?
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>


Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?

2017-06-23 Thread Keith Chapman
Hi,

I have code that does the following using RDDs,

val outputPartitionCount = 300
val part = new MyOwnPartitioner(outputPartitionCount)
val finalRdd = myRdd.repartitionAndSortWithinPartitions(part)

where myRdd is correctly formed as key, value pairs. I am looking convert
this to use Dataset/Dataframe instead of RDDs, so my question is:

Is there custom partitioning of Dataset/Dataframe implemented in Spark?
Can I accomplish the partial sort using mapPartitions on the resulting
partitioned Dataset/Dataframe?

Any thoughts?

Regards,
Keith.

http://keith-chapman.com


Re: Alternatives for dataframe collectAsList()

2017-04-04 Thread Keith Chapman
As Paul said it really depends on what you want to do with your data,
perhaps writing it to a file would be a better option, but again it depends
on what you want to do with the data you collect.

Regards,
Keith.

http://keith-chapman.com

On Tue, Apr 4, 2017 at 7:38 AM, Eike von Seggern <eike.segg...@sevenval.com>
wrote:

> Hi,
>
> depending on what you're trying to achieve `RDD.toLocalIterator()` might
> help you.
>
> Best
>
> Eike
>
>
> 2017-03-29 21:00 GMT+02:00 szep.laszlo.it <szep.laszlo...@gmail.com>:
>
>> Hi,
>>
>> after I created a dataset
>>
>> Dataset df = sqlContext.sql("query");
>>
>> I need to have a result values and I call a method: collectAsList()
>>
>> List list = df.collectAsList();
>>
>> But it's very slow, if I work with large datasets (20-30 million
>> records). I
>> know, that the result isn't presented in driver app, that's why it takes
>> long time, because collectAsList() collect all data from worker nodes.
>>
>> But then what is the right way to get result values? Is there an other
>> solution to iterate over a result dataset rows, or get values? Can anyone
>> post a small & working example?
>>
>> Thanks & Regards,
>> Laszlo Szep
>>
>


Re: Having issues reading a csv file into a DataSet using Spark 2.1

2017-03-22 Thread Keith Chapman
Thanks for the advice Diego, that was very helpful. How could I read the
csv as a dataset though? I need to do a map operation over the dataset, I
just coded up an example to illustrate the issue

On Mar 22, 2017 6:43 PM, "Diego Fanesi" <diego.fan...@gmail.com> wrote:

> You are using spark as a library but it is much more than that. The book
> "learning Spark"  is very well done and it helped me a lot starting with
> spark. Maybe you should start from there.
>
> Those are the issues in your code:
>
> Basically, you generally don't execute spark code like that. You could but
> it is not officially supported and many functions don't work in that way.
> You should start your local cluster made of master and single worker, then
> make a jar with your code and use spark-submit to send it to the cluster.
>
> You generally never use args because spark is a multiprocess, multi-thread
> application so args will not be available everywhere.
>
> All contexts have been merged into the same context in the last versions
> of spark. so you will need to do something like this:
>
> import org.apache.spark.sql.{DataFrame, SparkSession}
>
> object DatasetTest{
>
> val spark: SparkSession = SparkSession
>   .builder() .master("local[8]")
>   .appName("Spark basic example").getOrCreate()
>
> import spark.implicits._
>
> def main(Args: Array[String]) {
>
> var x = spark.read.format("csv").load("/home/user/data.csv")
>
> x.show()
>
> }
>
> }
>
>
> hope this helps.
>
> Diego
>
> On 22 Mar 2017 7:18 pm, "Keith Chapman" <keithgchap...@gmail.com> wrote:
>
> Hi,
>
> I'm trying to read in a CSV file into a Dataset but keep having
> compilation issues. I'm using spark 2.1 and the following is a small
> program that exhibit the issue I'm having. I've searched around but not
> found a solution that worked, I've added "import sqlContext.implicits._" as
> suggested but no luck. What am I missing? Would appreciate some advice.
>
> import org.apache.spark.sql.functions._
> import org.apache.spark.{SparkContext, SparkConf}
> import org.apache.spark.sql.{Encoder,Encoders}
>
> object DatasetTest{
>
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("DatasetTest")
> val sc = new SparkContext(sparkConf)
> case class Foo(text: String)
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext.implicits._
> val ds : org.apache.spark.sql.Dataset[Foo] =
> sqlContext.read.csv(args(1)).as[Foo]
> ds.show
>   }
> }
>
> Compiling the above program gives, I'd expect it to work as its a simple
> case class, changing it to as[String] works, but I would like to get the
> case class to work.
>
> [error] /home/keith/dataset/DataSetTest.scala:13: Unable to find encoder
> for type stored in a Dataset.  Primitive types (Int, String, etc) and
> Product types (case classes) are supported by importing spark.implicits._
> Support for serializing other types will be added in future releases.
> [error] val ds : org.apache.spark.sql.Dataset[Foo] =
> sqlContext.read.csv(args(1)).as[Foo]
>
>
> Regards,
> Keith.
>
>
>


Having issues reading a csv file into a DataSet using Spark 2.1

2017-03-22 Thread Keith Chapman
Hi,

I'm trying to read in a CSV file into a Dataset but keep having compilation
issues. I'm using spark 2.1 and the following is a small program that
exhibit the issue I'm having. I've searched around but not found a solution
that worked, I've added "import sqlContext.implicits._" as suggested but no
luck. What am I missing? Would appreciate some advice.

import org.apache.spark.sql.functions._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Encoder,Encoders}

object DatasetTest{

  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("DatasetTest")
val sc = new SparkContext(sparkConf)
case class Foo(text: String)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val ds : org.apache.spark.sql.Dataset[Foo] =
sqlContext.read.csv(args(1)).as[Foo]
ds.show
  }
}

Compiling the above program gives, I'd expect it to work as its a simple
case class, changing it to as[String] works, but I would like to get the
case class to work.

[error] /home/keith/dataset/DataSetTest.scala:13: Unable to find encoder
for type stored in a Dataset.  Primitive types (Int, String, etc) and
Product types (case classes) are supported by importing spark.implicits._
Support for serializing other types will be added in future releases.
[error] val ds : org.apache.spark.sql.Dataset[Foo] =
sqlContext.read.csv(args(1)).as[Foo]


Regards,
Keith.


Re:

2017-01-20 Thread Keith Chapman
Hi Jacek,

I've looked at SparkListener and tried it, I see it getting fired on the
master but I don't see it getting fired on the workers in a cluster.

Regards,
Keith.

http://keith-chapman.com

On Fri, Jan 20, 2017 at 11:09 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi,
>
> (redirecting to users as it has nothing to do with Spark project
> development)
>
> Monitor jobs and stages using SparkListener and submit cleanup jobs where
> a condition holds.
>
> Jacek
>
> On 20 Jan 2017 3:57 a.m., "Keith Chapman" <keithgchap...@gmail.com> wrote:
>
>> Hi ,
>>
>> Is it possible for an executor (or slave) to know when an actual job
>> ends? I'm running spark on a cluster (with yarn) and my workers create some
>> temporary files that I would like to clean up once the job ends. Is there a
>> way for the worker to detect that a job has finished? I tried doing it in
>> the JobProgressListener but it does not seem to work in a cluster. The
>> event is not triggered in the worker.
>>
>> Regards,
>> Keith.
>>
>> http://keith-chapman.com
>>
>


Library dependencies in Spark

2017-01-10 Thread Keith Turner
I recently wrote a blog post[1] sharing my experiences with using
Apache Spark to load data into Apache Fluo. One of the things I cover
in this blog post is late binding of dependencies and exclusion of
provided dependencies when building a shaded jar.  When writing the
post, I was unsure about dependency isolation and convergence
expectations in the Spark env.

Does Spark support any form of dependency isolation for user code?
For example can the Spark framework use Guava ver X while user code
uses Guava version Y?  This is assuming the user packaged Guava
version Y in their shaded jar.  Or, are Spark users expected to
converge their user dependency versions with those used by Spark?  For
example, the user is expected to converge their code to use Guava
version X which is used by the Spark framework.

[1]: http://fluo.apache.org/blog/2016/12/22/spark-load/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Long-running job OOMs driver process

2016-11-18 Thread Keith Bourgoin
Thanks for the input. I had read somewhere that s3:// was the way to go due
to some recent changes, but apparently that was outdated. I’m working on
creating some dummy data and a script to process it right now. I’ll post
here with code and logs when I can successfully reproduce the issue on
non-production data.

Yong, that's a good point about the web content. I had forgotten to mention
that when I first saw this a few months ago, on another project, I could
sometimes trigger the OOM by trying to view the web ui for the job. That's
another case I'll try to reproduce.

Thanks again!

Keith.

On Fri, Nov 18, 2016 at 10:30 AM Yong Zhang <java8...@hotmail.com> wrote:

> Just wondering, is it possible the memory usage keeping going up due to
> the web UI content?
>
>
> Yong
>
>
> --
> *From:* Alexis Seigneurin <aseigneu...@ipponusa.com>
> *Sent:* Friday, November 18, 2016 10:17 AM
> *To:* Nathan Lande
> *Cc:* Keith Bourgoin; Irina Truong; u...@spark.incubator.apache.org
> *Subject:* Re: Long-running job OOMs driver process
>
> +1 for using S3A.
>
> It would also depend on what format you're using. I agree with Steve that
> Parquet, for instance, is a good option. If you're using plain text files,
> some people use GZ files but they cannot be partitioned, thus putting a lot
> of pressure on the driver. It doesn't look like this is the issue you're
> running into, though, because it would not be a progressive slow down, but
> please provide as much detail as possible about your app.
>
> The cache could be an issue but the OOM would come from an executor, not
> from the driver.
>
> From what you're saying, Keith, it indeed looks like some memory is not
> being freed. Seeing the code would help. If you can, also send all the logs
> (with Spark at least in INFO level).
>
> Alexis
>
> On Fri, Nov 18, 2016 at 10:08 AM, Nathan Lande <nathanla...@gmail.com>
> wrote:
>
> +1 to not threading.
>
> What does your load look like? If you are loading many files and cacheing
> them in N rdds rather than 1 rdd this could be an issue.
>
> If the above two things don't fix your oom issue, without knowing anything
> else about your job, I would focus on your cacheing strategy as a potential
> culprit. Try running without any cacheing to isolate the issue; bad
> cacheing strategy is the source of oom issues for me most of the time.
>
> On Nov 18, 2016 6:31 AM, "Keith Bourgoin" <ke...@parsely.com> wrote:
>
> Hi Alexis,
>
> Thanks for the response. I've been working with Irina on trying to sort
> this issue out.
>
> We thread the file processing to amortize the cost of things like getting
> files from S3. It's a pattern we've seen recommended in many places, but I
> don't have any of those links handy.  The problem isn't the threading, per
> se, but clearly some sort of memory leak in the driver itself.  Each file
> is a self-contained unit of work, so once it's done all memory related to
> it should be freed. Nothing in the script itself grows over time, so if it
> can do 10 concurrently, it should be able to run like that forever.
>
> I've hit this same issue working on another Spark app which wasn't
> threaded, but produced tens of thousands of jobs. Eventually, the Spark UI
> would get slow, then unresponsive, and then be killed due to OOM.
>
> I'll try to cook up some examples of this today, threaded and not. We were
> hoping that someone had seen this before and it rung a bell. Maybe there's
> a setting to clean up info from old jobs that we can adjust.
>
> Cheers,
>
> Keith.
>
> On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin <
> aseigneu...@ipponusa.com> wrote:
>
> Hi Irina,
>
> I would question the use of multiple threads in your application. Since
> Spark is going to run the processing of each DataFrame on all the cores of
> your cluster, the processes will be competing for resources. In fact, they
> would not only compete for CPU cores but also for memory.
>
> Spark is designed to run your processes in a sequence, and each process
> will be run in a distributed manner (multiple threads on multiple
> instances). I would suggest to follow this principle.
>
> Feel free to share to code if you can. It's always helpful so that we can
> give better advice.
>
> Alexis
>
> On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong <ir...@parsely.com> wrote:
>
> We have an application that reads text files, converts them to dataframes,
> and saves them in Parquet format. The application runs fine when processing
> a few files, but we have several thousand produced every day. When running
> the job for all files, we have spark-submit killed on OOM:
>
> #
> # java.lang.Ou

Re: Long-running job OOMs driver process

2016-11-18 Thread Keith Bourgoin
Hi Alexis,

Thanks for the response. I've been working with Irina on trying to sort
this issue out.

We thread the file processing to amortize the cost of things like getting
files from S3. It's a pattern we've seen recommended in many places, but I
don't have any of those links handy.  The problem isn't the threading, per
se, but clearly some sort of memory leak in the driver itself.  Each file
is a self-contained unit of work, so once it's done all memory related to
it should be freed. Nothing in the script itself grows over time, so if it
can do 10 concurrently, it should be able to run like that forever.

I've hit this same issue working on another Spark app which wasn't
threaded, but produced tens of thousands of jobs. Eventually, the Spark UI
would get slow, then unresponsive, and then be killed due to OOM.

I'll try to cook up some examples of this today, threaded and not. We were
hoping that someone had seen this before and it rung a bell. Maybe there's
a setting to clean up info from old jobs that we can adjust.

Cheers,

Keith.

On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin <aseigneu...@ipponusa.com>
wrote:

> Hi Irina,
>
> I would question the use of multiple threads in your application. Since
> Spark is going to run the processing of each DataFrame on all the cores of
> your cluster, the processes will be competing for resources. In fact, they
> would not only compete for CPU cores but also for memory.
>
> Spark is designed to run your processes in a sequence, and each process
> will be run in a distributed manner (multiple threads on multiple
> instances). I would suggest to follow this principle.
>
> Feel free to share to code if you can. It's always helpful so that we can
> give better advice.
>
> Alexis
>
> On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong <ir...@parsely.com> wrote:
>
> We have an application that reads text files, converts them to dataframes,
> and saves them in Parquet format. The application runs fine when processing
> a few files, but we have several thousand produced every day. When running
> the job for all files, we have spark-submit killed on OOM:
>
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing /bin/sh -c "kill -9 27226"...
>
> The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark
> 2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge
> instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores
> and 30g of RAM each). Spark config settings are as follows:
>
> ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
>
> ('spark.executors.instances', '3'),
>
> ('spark.yarn.executor.memoryOverhead', '9g'),
>
> ('spark.executor.cores', '15'),
>
> ('spark.executor.memory', '12g'),
>
> ('spark.scheduler.mode', 'FIFO'),
>
> ('spark.cleaner.ttl', '1800'),
>
> The job processes each file in a thread, and we have 10 threads running
> concurrently. The process will OOM after about 4 hours, at which point
> Spark has processed over 20,000 jobs.
>
> It seems like the driver is running out of memory, but each individual job
> is quite small. Are there any known memory leaks for long-running Spark
> applications on Yarn?
>
>
>
>
> --
>
> *Alexis Seigneurin*
> *Managing Consultant*
> (202) 459-1591 <202%20459.1591> - LinkedIn
> <http://www.linkedin.com/in/alexisseigneurin>
>
> <http://ipponusa.com/>
> Rate our service <https://www.recommendi.com/app/survey/Eh2ZnWUPTxY>
>


Re: Using Java in Spark shell

2016-05-25 Thread Keith


There is no java shell in spark.

> On May 25, 2016, at 1:11 AM, Ashok Kumar  wrote:
> 
> Hello,
> 
> A newbie question.
> 
> Is it possible to use java code directly in spark shell without using maven 
> to build a jar file?
> 
> How can I switch from scala to java in spark shell?
> 
> Thanks
> 
> 


Spark SQL "partition stride"?

2016-01-11 Thread Keith Freeman
The spark docs section for "JDBC to Other Databases" 
(https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases) 
describes the partitioning as "... Notice that lowerBound and upperBound 
are just used to decide the partition stride, not for filtering the rows 
in table."


What is meant by "partition stride" here, I'm not familiar with the 
phrase and googling didn't help.


Also, is the behavior of this partitioning described in detail 
somewhere?  Looking at my SQL query log I've figured out what it's doing 
in my example:


say X = (upperBound - lowerBound) / numPartitions):

  query * where partitionColumn < lowerBound
  query * where partitionColumn >= lowerBound and partitionColumn < 
lowerBound + X
  query * where parititionColumn >= lowerBound+X and partitionColumn < 
lowerBound+2X

   until the query gets to upperBound

But it would be nice to know if there's docs on this?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



python rdd.partionBy(): any examples of a custom partitioner?

2015-12-07 Thread Keith Freeman
I'm not a python expert, so I'm wondering if anybody has a working 
example of a partitioner for the "partitionFunc" argument (default 
"portable_hash") to rdd.partitionBy()?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark 1.4.0 SQL JDBC partition stride?

2015-06-21 Thread Keith Freeman
The spark docs section for JDBC to Other Databases 
(https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases) 
describes the partitioning as ... Notice that lowerBound and upperBound 
are just used to decide the partition stride, not for filtering the rows 
in table.


What is meant by partition stride here, I'm not familiar with the 
phrase and googling didn't help.


Also, is the behaviour of this partitioning described in detail 
somewhere?  Looking at my SQL query log I've figured out what it's doing 
in my example (X = (upperBound - lowerBound) / numPartitions):


  query * where partitionColumn  lowerBound
  query * where partitionColumn = lowerBound and partitionColumn  
lowerBound + X
  query * where parititionColumn = lowerBound+X and partitionColumn  
lowerBound+X+X

   until the query gets to upperBound

But it would be nice to know if there's docs on this?



Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Actually, I'm working with a binary format.  The api allows reading out a
single record at a time, but I'm not sure how to get those records into
spark (without reading everything into memory from a single file at once).



On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but if
 your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD, and
 in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith





Re: Loading RDDs in a streaming fashion

2014-12-01 Thread Keith Simmons
Yep, that's definitely possible.  It's one of the workarounds I was
considering.  I was just curious if there was a simpler (and perhaps more
efficient) approach.

Keith

On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg andy.tw...@gmail.com wrote:

 Could you modify your function so that it streams through the files record
 by record and outputs them to hdfs, then read them all in as RDDs and take
 the union? That would only use bounded memory.

 On 1 December 2014 at 17:19, Keith Simmons ke...@pulse.io wrote:

 Actually, I'm working with a binary format.  The api allows reading out a
 single record at a time, but I'm not sure how to get those records into
 spark (without reading everything into memory from a single file at once).



 On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg andy.tw...@gmail.com wrote:

 file = tranform file into a bunch of records


 What does this function do exactly? Does it load the file locally?
 Spark supports RDDs exceeding global RAM (cf the terasort example), but
 if your example just loads each file locally, then this may cause problems.
 Instead, you should load each file into an rdd with context.textFile(),
 flatmap that and union these rdds.

 also see

 http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files


 On 1 December 2014 at 16:50, Keith Simmons ke...@pulse.io wrote:

 This is a long shot, but...

 I'm trying to load a bunch of files spread out over hdfs into an RDD,
 and in most cases it works well, but for a few very large files, I exceed
 available memory.  My current workflow basically works like this:

 context.parallelize(fileNames).flatMap { file =
   tranform file into a bunch of records
 }

 I'm wondering if there are any APIs to somehow flush the records of a
 big dataset so I don't have to load them all into memory at once.  I know
 this doesn't exist, but conceptually:

 context.parallelize(fileNames).streamMap { (file, stream) =
  for every 10K records write records to stream and flush
 }

 Keith







Re: Setting only master heap

2014-10-26 Thread Keith Simmons
Hi Guys,

Here's some lines from the log file before the OOM.  They don't look that
helpful, so let me know if there's anything else I should be sending.  I am
running in standalone mode.

spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:00:36 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-akka.actor.default-dispatcher-52] shutting down ActorSystem
[sparkMaster]
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:Exception
in thread qtp2057079871-30 java.lang.OutOfMemoryError: Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:00:07 WARN AbstractNioSelector: Unexpected exception in the selector
loop.
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:02:51 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-8] shutting down ActorSystem [sparkMaster]
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-akka.actor.default-dispatcher-38] shutting down ActorSystem
[sparkMaster]
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-6] shutting down ActorSystem [sparkMaster]
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-akka.actor.default-dispatcher-43] shutting down ActorSystem
[sparkMaster]
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-akka.actor.default-dispatcher-13] shutting down ActorSystem
[sparkMaster]
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-5] shutting down ActorSystem [sparkMaster]
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError:
Java heap space
spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22
05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread
[sparkMaster-akka.actor.default-dispatcher-12] shutting down ActorSystem
[sparkMaster]

On Thu, Oct 23, 2014 at 2:10 PM, Nan Zhu zhunanmcg...@gmail.com wrote:

 h…

 my observation is that, master in Spark 1.1 has higher frequency of GC……

 Also, before 1.1, I never encounter GC overtime in Master, after upgrade
 to 1.1, I have met for 2 times (we upgrade soon after 1.1 release)….

 Best,

 --
 Nan Zhu

 On Thursday, October 23, 2014 at 1:08 PM, Andrew Or wrote:

 Yeah, as Sameer commented, there is unfortunately not an equivalent
 `SPARK_MASTER_MEMORY` that you can set. You can work around this by
 starting the master and the slaves separately with different settings of
 SPARK_DAEMON_MEMORY each time.

 AFAIK there haven't been any major changes in the standalone master in
 1.1.0, so I don't see an immediate explanation for what you're observing.
 In general the Spark master doesn't use that much memory, and even if there
 are many applications it will discard the old ones appropriately, so unless
 you have a ton (like thousands) of concurrently running applications
 connecting to it there's little likelihood for it to OOM. At least that's
 my understanding.

 -Andrew

 2014-10-22 15:51 GMT-07:00 Sameer Farooqui same...@databricks.com:

 Hi Keith,

 Would be helpful if you could post the error message.

 Are you running Spark in Standalone mode or with YARN?

 In general, the Spark Master is only used for scheduling and it should be
 fine with the default setting of 512 MB RAM.

 Is it actually the Spark Driver's memory that you intended to change?



 *++ If in Standalone mode ++*
 You're right that SPARK_DAEMON_MEMORY set the memory to allocate to the
 Spark Master, Worker and even HistoryServer daemons together.

 SPARK_WORKER_MEMORY is slightly confusing. In Standalone mode

Setting only master heap

2014-10-22 Thread Keith Simmons
We've been getting some OOMs from the spark master since upgrading to Spark
1.1.0.  I've found SPARK_DAEMON_MEMORY, but that also seems to increase the
worker heap, which as far as I know is fine.  Is there any setting which
*only* increases the master heap size?

Keith


Re: Hung spark executors don't count toward worker memory limit

2014-10-13 Thread Keith Simmons
Maybe I should put this another way.  If spark has two jobs, A and B, both
of which consume the entire allocated memory pool, is it expected that
spark can launch B before the executor processes tied to A are completely
terminated?

On Thu, Oct 9, 2014 at 6:57 PM, Keith Simmons ke...@pulse.io wrote:

 Actually, it looks like even when the job shuts down cleanly, there can be
 a few minutes of overlap between the time the next job launches and the
 first job actually terminates it's process.  Here's some relevant lines
 from my log:

 14/10/09 20:49:20 INFO Worker: Asked to kill executor
 app-20141009204127-0029/1
 14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor
 app-20141009204127-0029/1 interrupted
 14/10/09 20:49:20 INFO ExecutorRunner: Killing process!
 14/10/09 20:49:20 INFO Worker: Asked to launch executor
 app-20141009204508-0030/1 for Job
 ... More lines about launching new job...
 14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished
 with state KILLED

 As you can see, the first app didn't actually shutdown until two minutes
 after the new job launched.  During that time, I was at double the worker
 memory limit.

 Keith


 On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons ke...@pulse.io wrote:

 Hi Folks,

 We have a spark job that is occasionally running out of memory and
 hanging (I believe in GC).  This is it's own issue we're debugging, but in
 the meantime, there's another unfortunate side effect.  When the job is
 killed (most often because of GC errors), each worker attempts to kill its
 respective executor.  However, it appears that several of the executors
 fail to shut themselves down (I actually have to kill -9 them).  However,
 even though the worker fails to successfully cleanup the executor, it
 starts the next job as though all the resources have been freed up.  This
 is causing the spark worker to exceed it's configured memory limit, which
 is in turn running our boxes out of memory.  Is there a setting I can
 configure to prevent this issue?  Perhaps by having the worker force kill
 the executor or not start the next job until it's confirmed the executor
 has exited?  Let me know if there's any additional information I can
 provide.

 Keith

 P.S. We're running spark 1.0.2





Hung spark executors don't count toward worker memory limit

2014-10-09 Thread Keith Simmons
Hi Folks,

We have a spark job that is occasionally running out of memory and hanging
(I believe in GC).  This is it's own issue we're debugging, but in the
meantime, there's another unfortunate side effect.  When the job is killed
(most often because of GC errors), each worker attempts to kill its
respective executor.  However, it appears that several of the executors
fail to shut themselves down (I actually have to kill -9 them).  However,
even though the worker fails to successfully cleanup the executor, it
starts the next job as though all the resources have been freed up.  This
is causing the spark worker to exceed it's configured memory limit, which
is in turn running our boxes out of memory.  Is there a setting I can
configure to prevent this issue?  Perhaps by having the worker force kill
the executor or not start the next job until it's confirmed the executor
has exited?  Let me know if there's any additional information I can
provide.

Keith

P.S. We're running spark 1.0.2


Re: Hung spark executors don't count toward worker memory limit

2014-10-09 Thread Keith Simmons
Actually, it looks like even when the job shuts down cleanly, there can be
a few minutes of overlap between the time the next job launches and the
first job actually terminates it's process.  Here's some relevant lines
from my log:

14/10/09 20:49:20 INFO Worker: Asked to kill executor
app-20141009204127-0029/1
14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor
app-20141009204127-0029/1 interrupted
14/10/09 20:49:20 INFO ExecutorRunner: Killing process!
14/10/09 20:49:20 INFO Worker: Asked to launch executor
app-20141009204508-0030/1 for Job
... More lines about launching new job...
14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished
with state KILLED

As you can see, the first app didn't actually shutdown until two minutes
after the new job launched.  During that time, I was at double the worker
memory limit.

Keith


On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons ke...@pulse.io wrote:

 Hi Folks,

 We have a spark job that is occasionally running out of memory and hanging
 (I believe in GC).  This is it's own issue we're debugging, but in the
 meantime, there's another unfortunate side effect.  When the job is killed
 (most often because of GC errors), each worker attempts to kill its
 respective executor.  However, it appears that several of the executors
 fail to shut themselves down (I actually have to kill -9 them).  However,
 even though the worker fails to successfully cleanup the executor, it
 starts the next job as though all the resources have been freed up.  This
 is causing the spark worker to exceed it's configured memory limit, which
 is in turn running our boxes out of memory.  Is there a setting I can
 configure to prevent this issue?  Perhaps by having the worker force kill
 the executor or not start the next job until it's confirmed the executor
 has exited?  Let me know if there's any additional information I can
 provide.

 Keith

 P.S. We're running spark 1.0.2



Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
HI folks,

I'm running into the following error when trying to perform a join in my
code:

java.lang.NoClassDefFoundError: Could not initialize class
org.apache.spark.sql.catalyst.types.LongType$

I see similar errors for StringType$ and also:

 scala.reflect.runtime.ReflectError: value apache is not a package.

Strangely, if I just work with a single table, everything is fine. I can
iterate through the records in both tables and print them out without a
problem.

Furthermore, this code worked without an exception in Spark 1.0.0 (thought
the join caused some field corruption, possibly related to
https://issues.apache.org/jira/browse/SPARK-1994
https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994sa=Dsntz=1usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q).
 The data is coming from a custom protocol buffer based format on hdfs that
is being mapped into the individual record types without a problem.

The immediate cause seems to be a task trying to deserialize one or more
SQL case classes before loading the spark uber jar, but I have no idea why
this is happening, or why it only happens when I do a join.  Ideas?

Keith

P.S. If it's relevant, we're using the Kryo serializer.


Re: Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
To give a few more details of my environment in case that helps you
reproduce:

* I'm running spark 1.0.1 downloaded as a tar ball, not built myself
* I'm running in stand alone mode, with 1 master and 1 worker, both on the
same machine (though the same error occurs with two workers on two machines)
* I'm using spark-core and spark-sql 1.0.1 pulled via maven

Here's my built.sbt:

name := spark-test

version := 1.0

scalaVersion := 2.10.4

resolvers += Akka Repository at http://repo.akka.io/releases/;

resolvers += Cloudera Repository at 
https://repository.cloudera.com/artifactory/cloudera-repos/;

libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 %
provided

libraryDependencies += org.apache.spark %% spark-core % 1.0.1 %
provided


On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com
wrote:

 FWIW, I am unable to reproduce this using the example program locally.

 On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com
 wrote:
  Nope.  All of them are registered from the driver program.
 
  However, I think we've found the culprit.  If the join column between two
  tables is not in the same column position in both tables, it triggers
 what
  appears to be a bug.  For example, this program fails:
 
  import org.apache.spark.SparkContext._
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.sql.SchemaRDD
  import org.apache.spark.sql.catalyst.types._
 
  case class Record(value: String, key: Int)
  case class Record2(key: Int, value: String)
 
  object TestJob {
 
def main(args: Array[String]) {
  run()
}
 
private def run() {
  val sparkConf = new SparkConf()
  sparkConf.setAppName(TestJob)
  sparkConf.set(spark.cores.max, 8)
  sparkConf.set(spark.storage.memoryFraction, 0.1)
  sparkConf.set(spark.shuffle.memoryFracton, 0.2)
  sparkConf.set(spark.executor.memory, 2g)
 
 sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar))
  sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077)
  sparkConf.setSparkHome(/home/pulseio/spark/current)
  val sc = new SparkContext(sparkConf)
 
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  import sqlContext._
 
  val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i, i)))
  val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i, srdd_$i)))
  rdd1.registerAsTable(rdd1)
  rdd2.registerAsTable(rdd2)
 
  sql(SELECT * FROM rdd1).collect.foreach { row = println(row) }
 
  sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on
  rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row =
  println(row) }
 
  sc.stop()
}
 
  }
 
  If you change the definition of Record and Record2 to the following, it
  succeeds:
 
  case class Record(key: Int, value: String)
  case class Record2(key: Int, value: String)
 
  as does:
 
  case class Record(value: String, key: Int)
  case class Record2(value: String, key: Int)
 
  Let me know if you need anymore details.
 
 
  On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust 
 mich...@databricks.com
  wrote:
 
  Are you registering multiple RDDs of case classes as tables
 concurrently?
  You are possibly hitting SPARK-2178 which is caused by SI-6240.
 
 
  On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons 
 keith.simm...@gmail.com
  wrote:
 
  HI folks,
 
  I'm running into the following error when trying to perform a join in
 my
  code:
 
  java.lang.NoClassDefFoundError: Could not initialize class
  org.apache.spark.sql.catalyst.types.LongType$
 
  I see similar errors for StringType$ and also:
 
   scala.reflect.runtime.ReflectError: value apache is not a package.
 
  Strangely, if I just work with a single table, everything is fine. I
 can
  iterate through the records in both tables and print them out without a
  problem.
 
  Furthermore, this code worked without an exception in Spark 1.0.0
  (thought the join caused some field corruption, possibly related to
  https://issues.apache.org/jira/browse/SPARK-1994).  The data is
 coming from
  a custom protocol buffer based format on hdfs that is being mapped
 into the
  individual record types without a problem.
 
  The immediate cause seems to be a task trying to deserialize one or
 more
  SQL case classes before loading the spark uber jar, but I have no idea
 why
  this is happening, or why it only happens when I do a join.  Ideas?
 
  Keith
 
  P.S. If it's relevant, we're using the Kryo serializer.
 
 
 
 



Re: Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1836)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:701)


On Tue, Jul 15, 2014 at 1:05 PM, Michael Armbrust mich...@databricks.com
wrote:

 Can you print out the queryExecution?

 (i.e. println(sql().queryExecution))


 On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons keith.simm...@gmail.com
 wrote:

 To give a few more details of my environment in case that helps you
 reproduce:

 * I'm running spark 1.0.1 downloaded as a tar ball, not built myself
 * I'm running in stand alone mode, with 1 master and 1 worker, both on
 the same machine (though the same error occurs with two workers on two
 machines)
 * I'm using spark-core and spark-sql 1.0.1 pulled via maven

 Here's my built.sbt:

 name := spark-test

 version := 1.0

 scalaVersion := 2.10.4

 resolvers += Akka Repository at http://repo.akka.io/releases/;

 resolvers += Cloudera Repository at 
 https://repository.cloudera.com/artifactory/cloudera-repos/;

 libraryDependencies += org.apache.spark %% spark-sql % 1.0.1 %
 provided

 libraryDependencies += org.apache.spark %% spark-core % 1.0.1 %
 provided


 On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang zonghen...@gmail.com
 wrote:

 FWIW, I am unable to reproduce this using the example program locally.

 On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons keith.simm...@gmail.com
 wrote:
  Nope.  All of them are registered from the driver program.
 
  However, I think we've found the culprit.  If the join column between
 two
  tables is not in the same column position in both tables, it triggers
 what
  appears to be a bug.  For example, this program fails:
 
  import org.apache.spark.SparkContext._
  import org.apache.spark.SparkContext
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.SQLContext
  import org.apache.spark.sql.SchemaRDD
  import org.apache.spark.sql.catalyst.types._
 
  case class Record(value: String, key: Int)
  case class Record2(key: Int, value: String)
 
  object TestJob {
 
def main(args: Array[String]) {
  run()
}
 
private def run() {
  val sparkConf = new SparkConf()
  sparkConf.setAppName(TestJob)
  sparkConf.set(spark.cores.max, 8)
  sparkConf.set(spark.storage.memoryFraction, 0.1)
  sparkConf.set(spark.shuffle.memoryFracton, 0.2)
  sparkConf.set(spark.executor.memory, 2g)
 
 sparkConf.setJars(List(target/scala-2.10/spark-test-assembly-1.0.jar))
  sparkConf.setMaster(sspark://dev1.dev.pulse.io:7077)
  sparkConf.setSparkHome(/home/pulseio/spark/current)
  val sc = new SparkContext(sparkConf)
 
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  import sqlContext._
 
  val rdd1 = sc.parallelize((1 to 100).map(i = Record(sval_$i,
 i)))
  val rdd2 = sc.parallelize((1 to 100).map(i = Record2(i,
 srdd_$i)))
  rdd1.registerAsTable(rdd1)
  rdd2.registerAsTable(rdd2)
 
  sql(SELECT * FROM rdd1).collect.foreach { row = println(row) }
 
  sql(SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on
  rdd1.key = rdd2.key order by rdd1.key).collect.foreach { row =
  println(row) }
 
  sc.stop()
}
 
  }
 
  If you change the definition of Record and Record2 to the following, it
  succeeds:
 
  case class Record(key: Int, value: String)
  case class Record2(key: Int, value: String)
 
  as does:
 
  case class Record(value: String, key: Int)
  case class Record2(value: String, key: Int)
 
  Let me know if you need anymore details.
 
 
  On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust 
 mich...@databricks.com
  wrote:
 
  Are you registering multiple RDDs of case classes as tables
 concurrently?
  You are possibly hitting SPARK-2178 which is caused by SI-6240.
 
 
  On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons 
 keith.simm...@gmail.com
  wrote:
 
  HI folks,
 
  I'm running into the following error when trying to perform a join
 in my
  code:
 
  java.lang.NoClassDefFoundError: Could not initialize class
  org.apache.spark.sql.catalyst.types.LongType$
 
  I see similar errors for StringType$ and also:
 
   scala.reflect.runtime.ReflectError

Re: Error while running Spark SQL join when using Spark 1.0.1

2014-07-15 Thread Keith Simmons
Cool.  So Michael's hunch was correct, it is a thread issue.  I'm currently
using a tarball build, but I'll do a spark build with the patch as soon as
I have a chance and test it out.

Keith


On Tue, Jul 15, 2014 at 4:14 PM, Zongheng Yang zonghen...@gmail.com wrote:

 Hi Keith  gorenuru,

 This patch (https://github.com/apache/spark/pull/1423) solves the
 errors for me in my local tests. If possible, can you guys test this
 out to see if it solves your test programs?

 Thanks,
 Zongheng

 On Tue, Jul 15, 2014 at 3:08 PM, Zongheng Yang zonghen...@gmail.com
 wrote:
  - user@incubator
 
  Hi Keith,
 
  I did reproduce this using local-cluster[2,2,1024], and the errors
  look almost the same.  Just wondering, despite the errors did your
  program output any result for the join? On my machine, I could see the
  correct output.
 
  Zongheng
 
  On Tue, Jul 15, 2014 at 1:46 PM, Michael Armbrust
  mich...@databricks.com wrote:
  Thanks for the extra info.  At a quick glance the query plan looks fine
 to
  me.  The class IntegerType does build a type tag I wonder if you are
  seeing the Scala issue manifest in some new way.  We will attempt to
  reproduce locally.
 
 
  On Tue, Jul 15, 2014 at 1:41 PM, gorenuru goren...@gmail.com wrote:
 
  Just my few cents on this.
 
  I having the same problems with v 1.0.1 but this bug is sporadic and
 looks
  like is relayed to object initialization.
 
  Even more, i'm not using any SQL or something. I just have utility
 class
  like this:
 
  object DataTypeDescriptor {
type DataType = String
 
val BOOLEAN = BOOLEAN
val STRING = STRING
val TIMESTAMP = TIMESTAMP
val LONG = LONG
val INT = INT
val SHORT = SHORT
val BYTE = BYTE
val DECIMAL = DECIMAL
val DOUBLE = DOUBLE
val FLOAT = FLOAT
 
def $$(name: String, format: Option[String] = None) =
  DataTypeDescriptor(name, format)
 
private lazy val nativeTypes: Map[String, NativeType] = Map(
  BOOLEAN - BooleanType, STRING - StringType, TIMESTAMP -
  TimestampType, LONG - LongType, INT - IntegerType,
  SHORT - ShortType, BYTE - ByteType, DECIMAL - DecimalType,
 DOUBLE
  -
  DoubleType, FLOAT - FloatType
)
 
lazy val defaultValues: Map[String, Any] = Map(
  BOOLEAN - false, STRING - , TIMESTAMP - null, LONG - 0L, INT
 -
  0,
  SHORT - 0.toShort, BYTE - 0.toByte,
  DECIMAL - BigDecimal(0d), DOUBLE - 0d, FLOAT - 0f
)
 
def apply(dataType: String): DataTypeDescriptor = {
  DataTypeDescriptor(dataType.toUpperCase, None)
}
 
def apply(dataType: SparkDataType): DataTypeDescriptor = {
  nativeTypes
.find { case (_, descriptor) = descriptor == dataType }
.map { case (name, descriptor) = DataTypeDescriptor(name, None)
 }
.get
}
 
  .
 
  and some test that check each of this methods.
 
  The problem is that this test fails randomly with this error.
 
  P.S.: I did not have this problem in Spark 1.0.0
 
 
 
  --
  View this message in context:
 
 http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html
  Sent from the Apache Spark User List mailing list archive at
 Nabble.com.
 
 



Re: Comparative study

2014-07-09 Thread Keith Simmons
Good point.  Shows how personal use cases color how we interpret products.


On Wed, Jul 9, 2014 at 1:08 AM, Sean Owen so...@cloudera.com wrote:

 On Wed, Jul 9, 2014 at 1:52 AM, Keith Simmons ke...@pulse.io wrote:

  Impala is *not* built on map/reduce, though it was built to replace
 Hive, which is map/reduce based.  It has its own distributed query engine,
 though it does load data from HDFS, and is part of the hadoop ecosystem.
  Impala really shines when your


 (It was not built to replace Hive. It's purpose-built to make interactive
 use with a BI tool feasible -- single-digit second queries on huge data
 sets. It's very memory hungry. Hive's architecture choices and legacy code
 have been throughput-oriented, and can't really get below minutes at scale,
 but, remains a right choice when you are in fact doing ETL!)



Re: Comparative study

2014-07-08 Thread Keith Simmons
Santosh,

To add a bit more to what Nabeel said, Spark and Impala are very different
tools.  Impala is *not* built on map/reduce, though it was built to replace
Hive, which is map/reduce based.  It has its own distributed query engine,
though it does load data from HDFS, and is part of the hadoop ecosystem.
 Impala really shines when your entire dataset fits into memory and your
processing can be expressed in terms of sql.  Paired with the column
oriented Parquet format, it can really scream with the right dataset.

Spark also has a SQL layer (formely shark, now more tightly integrated with
Spark), but at least for our dataset, Impala was faster.  However, Spark
has a fantastic and far more flexible programming model.  As has been
mentioned a few times in this thread, it has a better batch processing
model than map/reduce, it can do stream processing, and in the newest
release, it looks like it can even mix and match sql queries.  You do need
to be more aware of memory issues than map/reduce, since using more memory
is one of the primary sources of Sparks speed, but with that caveat, its a
great technology.  In our particular workflow, we're replacing map/reduce
with spark for our batch layer and using Impala for our query layer.

Daniel,

For what it's worth, we've had a bunch of hanging issues because the
garbage collector seems to get out of control.  The most effective
technique has been to dramatically increase the numPartition argument in
our various groupBy and cogroup calls which reduces the per-task memory
requirements.  We also reduced the memory used by the shuffler (
spark.shuffle.memoryFraction) and turned off RDD memory (since we don't
have any iterative algorithms).  Finally, using kryo delivered a hug
performance and memory boost (even without registering any custom
serializers).

Keith




On Tue, Jul 8, 2014 at 2:58 PM, Robert James srobertja...@gmail.com wrote:

 As a new user, I can definitely say that my experience with Spark has
 been rather raw.  The appeal of interactive, batch, and in between all
 using more or less straight Scala is unarguable.  But the experience
 of deploying Spark has been quite painful, mainly about gaps between
 compile time and run time to the JVM, due to dependency conflicts,
 having to use uber jars, Spark's own uber jar which includes some very
 old libs, etc.

 What's more, there's very little resources available to help.  Some
 times I've been able to get help via public sources, but, more often
 than not, it's been trial and error.  Enough that, despite Spark's
 unmistakable appeal, we are seriously considering dropping it entirely
 and just doing a classical Hadoop.

 On 7/8/14, Surendranauth Hiraman suren.hira...@velos.io wrote:
  Aaron,
 
  I don't think anyone was saying Spark can't handle this data size, given
  testimony from the Spark team, Bizo, etc., on large datasets. This has
 kept
  us trying different things to get our flow to work over the course of
  several weeks.
 
  Agreed that the first instinct should be what did I do wrong.
 
  I believe that is what every person facing this issue has done, in
 reaching
  out to the user group repeatedly over the course of the few of months
 that
  I've been active here. I also know other companies (all experienced with
  large production datasets on other platforms) facing the same types of
  issues - flows that run on subsets of data but not the whole production
  set.
 
  So I think, as you are saying, it points to the need for further
  diagnostics. And maybe also some type of guidance on typical issues with
  different types of datasets (wide rows, narrow rows, etc.), flow
  topologies. etc.? Hard to tell where we are going wrong right now. We've
  tried many things over the course of 6 weeks or so.
 
  I tried to look for the professional services link on databricks.com but
  didn't find it. ;-) (jk).
 
  -Suren
 
 
 
  On Tue, Jul 8, 2014 at 4:16 PM, Aaron Davidson ilike...@gmail.com
 wrote:
 
  Not sure exactly what is happening but perhaps there are ways to
  restructure your program for it to work better. Spark is definitely
 able
  to
  handle much, much larger workloads.
 
 
  +1 @Reynold
 
  Spark can handle big big data. There are known issues with informing
  the
  user about what went wrong and how to fix it that we're actively working
  on, but the first impulse when a job fails should be what did I do
  wrong
  rather than Spark can't handle this workload. Messaging is a huge part
  in
  making this clear -- getting things like a job hanging or an out of
  memory
  error can be very difficult to debug, and improving this is one of our
  highest priorties.
 
 
  On Tue, Jul 8, 2014 at 12:47 PM, Reynold Xin r...@databricks.com
 wrote:
 
  Not sure exactly what is happening but perhaps there are ways to
  restructure your program for it to work better. Spark is definitely
 able
  to
  handle much, much larger workloads.
 
  I've personally run a workload that shuffled 300 TB of data

Re: Spark Memory Bounds

2014-05-28 Thread Keith Simmons
Thanks!  Sounds like my rough understanding was roughly right :)

Definitely understand cached RDDs can add to the memory requirements.
 Luckily, like you mentioned, you can configure spark to flush that to disk
and bound its total size in memory via spark.storage.memoryFraction, so I
have a pretty good handle on the overall RDD contribution.

Thanks for all the help.

Keith


On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen c...@adatao.com wrote:

 Keith, please see inline.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 7:22 PM, Keith Simmons ke...@pulse.io wrote:

 A dash of both.  I want to know enough that I can reason about, rather
 than strictly control, the amount of memory Spark will use.  If I have a
 big data set, I want to understand how I can design it so that Spark's
 memory consumption falls below my available resources.  Or alternatively,
 if it's even possible for Spark to process a data set over a certain size.
  And if I run into memory problems, I want to know which knobs to turn, and
 how turning those knobs will affect memory consumption.


 In practice, to avoid OOME, a key dial we use is the size (or inversely,
 number) of the partitions of your dataset. Clearly there is some blow-up
 factor F such that, e.g., if you start out with 128MB on-disk data
 partitions, you would consume 128F MB of memory, both by Spark and by your
 closure code. Knowing this, you would want to size the partitions such that
 AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker  128F. To arrive at
 F, you could do some back-of-the-envelope modeling, and/or run the job and
 observe empirically.



 It's my understanding that between certain key stages in a Spark DAG
 (i.e. group by stages), Spark will serialize all data structures necessary
 to continue the computation at the next stage, including closures.  So in
 theory, per machine, Spark only needs to hold the transient memory required
 to process the partitions assigned to the currently active tasks.  Is my
 understanding correct?  Specifically, once a key/value pair is serialized
 in the shuffle stage of a task, are the references to the raw java objects
 released before the next task is started.


 Yes, that is correct in non-cached mode. At the same time, Spark also does
 something else optionally, which is to keep the data structures (RDDs)
 persistent in memory (*). As such it is possible partitions that are not
 being actively worked on to be consuming memory. Spark will spill all these
 to local disk if they take up more memory than it is allowed to take. So
 the key thing to worry about is less about what Spark does (apart of
 overhead and yes, the possibility of bugs that need to be fixed), and more
 about what your closure code does with JVM memory as a whole. If in doubt,
 refer back to the blow-up factor model described above.

 (*) this is a fundamentally differentiating feature of Spark over a range
 of other in-memory architectures, that focus on raw-data or transient
 caches that serve non-equivalent purposes when viewed from the application
 level. It allows for very fast access to ready-to-consume high-level data
 structures, as long as available RAM permits.




 On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.comwrote:

 Keith, do you mean bound as in (a) strictly control to some
 quantifiable limit, or (b) try to minimize the amount used by each task?

 If a, then that is outside the scope of Spark's memory management,
 which you should think of as an application-level (that is, above JVM)
 mechanism. In this scope, Spark voluntarily tracks and limits the amount
 of memory it uses for explicitly known data structures, such as RDDs. What
 Spark cannot do is, e.g., control or manage the amount of JVM memory that a
 given piece of user code might take up. For example, I might write some
 closure code that allocates a large array of doubles unbeknownst to Spark.

 If b, then your thinking is in the right direction, although quite
 imperfect, because of things like the example above. We often experience
 OOME if we're not careful with job partitioning. What I think Spark needs
 to evolve to is at least to include a mechanism for application-level hints
 about task memory requirements. We might work on this and submit a PR for
 it.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote:

 I'm trying to determine how to bound my memory use in a job working
 with more data than can simultaneously fit in RAM.  From reading the tuning
 guide, my impression is that Spark's memory usage is roughly the following:

 (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient
 memory used by all currently running tasks

 I can bound A with spark.storage.memoryFraction and I can bound B with 
 spark.shuffle.memoryFraction.
  I'm

Spark Memory Bounds

2014-05-27 Thread Keith Simmons
I'm trying to determine how to bound my memory use in a job working with
more data than can simultaneously fit in RAM.  From reading the tuning
guide, my impression is that Spark's memory usage is roughly the following:

(A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
used by all currently running tasks

I can bound A with spark.storage.memoryFraction and I can bound B with
spark.shuffle.memoryFraction.
 I'm wondering how to bound C.

It's been hinted at a few times on this mailing list that you can reduce
memory use by increasing the number of partitions.  That leads me to
believe that the amount of transient memory is roughly follows:

total_data_set_size/number_of_partitions *
number_of_tasks_simultaneously_running_per_machine

Does this sound right?  In other words, as I increase the number of
partitions, the size of each partition will decrease, and since each task
is processing a single partition and there are a bounded number of tasks in
flight, my memory use has a rough upper limit.

Keith


Re: Spark Memory Bounds

2014-05-27 Thread Keith Simmons
A dash of both.  I want to know enough that I can reason about, rather
than strictly control, the amount of memory Spark will use.  If I have a
big data set, I want to understand how I can design it so that Spark's
memory consumption falls below my available resources.  Or alternatively,
if it's even possible for Spark to process a data set over a certain size.
 And if I run into memory problems, I want to know which knobs to turn, and
how turning those knobs will affect memory consumption.

It's my understanding that between certain key stages in a Spark DAG (i.e.
group by stages), Spark will serialize all data structures necessary to
continue the computation at the next stage, including closures.  So in
theory, per machine, Spark only needs to hold the transient memory required
to process the partitions assigned to the currently active tasks.  Is my
understanding correct?  Specifically, once a key/value pair is serialized
in the shuffle stage of a task, are the references to the raw java objects
released before the next task is started.



On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen c...@adatao.com wrote:

 Keith, do you mean bound as in (a) strictly control to some quantifiable
 limit, or (b) try to minimize the amount used by each task?

 If a, then that is outside the scope of Spark's memory management, which
 you should think of as an application-level (that is, above JVM) mechanism.
 In this scope, Spark voluntarily tracks and limits the amount of memory
 it uses for explicitly known data structures, such as RDDs. What Spark
 cannot do is, e.g., control or manage the amount of JVM memory that a given
 piece of user code might take up. For example, I might write some closure
 code that allocates a large array of doubles unbeknownst to Spark.

 If b, then your thinking is in the right direction, although quite
 imperfect, because of things like the example above. We often experience
 OOME if we're not careful with job partitioning. What I think Spark needs
 to evolve to is at least to include a mechanism for application-level hints
 about task memory requirements. We might work on this and submit a PR for
 it.

 --
 Christopher T. Nguyen
 Co-founder  CEO, Adatao http://adatao.com
 linkedin.com/in/ctnguyen



 On Tue, May 27, 2014 at 5:33 PM, Keith Simmons ke...@pulse.io wrote:

 I'm trying to determine how to bound my memory use in a job working with
 more data than can simultaneously fit in RAM.  From reading the tuning
 guide, my impression is that Spark's memory usage is roughly the following:

 (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory
 used by all currently running tasks

 I can bound A with spark.storage.memoryFraction and I can bound B with 
 spark.shuffle.memoryFraction.
  I'm wondering how to bound C.

 It's been hinted at a few times on this mailing list that you can reduce
 memory use by increasing the number of partitions.  That leads me to
 believe that the amount of transient memory is roughly follows:

 total_data_set_size/number_of_partitions *
 number_of_tasks_simultaneously_running_per_machine

 Does this sound right?  In other words, as I increase the number of
 partitions, the size of each partition will decrease, and since each task
 is processing a single partition and there are a bounded number of tasks in
 flight, my memory use has a rough upper limit.

 Keith





Re: TriangleCount Shortest Path under Spark

2014-03-13 Thread Keith Massey
The triangle count failed for me when I ran it on more than one node. There
was this assertion in TriangleCount.scala:
 // double count should be even (divisible by two)
assert((dblCount  1) == 0)
That did not hold true when I ran this on multiple nodes, even when
following the guidelines to make sure that all source ids are greater than
destination ids and partitioning the graph using RandomVertexCut. I didn't
dig into the code to see why this assertion was failing, but commenting
that line out allowed the code to run. I'm not sure how much I can trust
the results, but they looked generally right. Not sure if this is the
failure you are talking about or not.

As far as shortest path, the programming guide had an example that worked
well for me under
https://spark.incubator.apache.org/docs/latest/graphx-programming-guide.html#pregel-api
.

Keith


On Sun, Mar 9, 2014 at 5:52 PM, yxzhao yxz...@ualr.edu wrote:

 Hi All,

 I have already set up Spark-0.9.0-incubating on our school's cluster. I
 successfully run the Spark PageRank demo located in
 /spark-0.9.0-incubating/examples/src/main/scala/org/apache/spark/examples.

 Problem 1. I want to run the TriangleCount whose source code located

 in/spark-0.9.0-incubating/graphx/src/main/scala/org/apache/spark/graphx/lib.
 I used the following command.
 ./bin/run-example org.apache.spark.graphx.GraphOps.TriangleCount
 spark://10.1.255.206:7077
  But it did not work. Is there any mistake in my command line? Could
 anybody
 let me know how to correctly run the TriangleCount demo. Thanks very much.

 Problem 2.  Does anybody have shortest path implementation code under
 Spark?
 If so could you share it with me?

 Thanks in advance,
 Regards,
 Joey





 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/TriangleCount-Shortest-Path-under-Spark-tp2438.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.