RE: DataFrame select non-existing column

2016-11-18 Thread Mendelson, Assaf
In pyspark for example you would do something like:

df.withColumn("newColName",pyspark.sql.functions.lit(None))

Assaf.
-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Friday, November 18, 2016 9:19 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: DataFrame select non-existing column

Thanks for your answer. I have been searching the API for doing that but I 
could not find how to do it?

Could you give me a code snippet?

On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf  
wrote:
> You can always add the columns to old dataframes giving them null (or some 
> literal) as a preprocessing.
>
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Friday, November 18, 2016 4:32 PM
> To: user
> Subject: DataFrame select non-existing column
>
> Hi
>
> We have evolved a DataFrame by adding a few columns but cannot write select 
> statements on these columns for older data that doesn't have them since they 
> fail with a AnalysisException with message "No such struct field".
>
> We also tried dropping columns but this doesn't work for nested columns.
>
> Any non-hacky ways to get around this?
>
> Cheers,
> -Kristoffer
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>


Re: DataFrame select non-existing column

2016-11-18 Thread Muthu Jayakumar
Depending on your use case, 'df.withColumn("my_existing_or_new_col",
lit(0l))' could work?

On Fri, Nov 18, 2016 at 11:18 AM, Kristoffer Sjögren 
wrote:

> Thanks for your answer. I have been searching the API for doing that
> but I could not find how to do it?
>
> Could you give me a code snippet?
>
> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf
>  wrote:
> > You can always add the columns to old dataframes giving them null (or
> some literal) as a preprocessing.
> >
> > -Original Message-
> > From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> > Sent: Friday, November 18, 2016 4:32 PM
> > To: user
> > Subject: DataFrame select non-existing column
> >
> > Hi
> >
> > We have evolved a DataFrame by adding a few columns but cannot write
> select statements on these columns for older data that doesn't have them
> since they fail with a AnalysisException with message "No such struct
> field".
> >
> > We also tried dropping columns but this doesn't work for nested columns.
> >
> > Any non-hacky ways to get around this?
> >
> > Cheers,
> > -Kristoffer
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Reading LZO files with Spark

2016-11-18 Thread learning_spark
Hi Users,I am not sure about the latest status of this
issue:https://issues.apache.org/jira/browse/SPARK-2394However, I have seen
the following link:
https://github.com/awslabs/emr-bootstrap-actions/blob/master/spark/examples/reading-lzo-files.mdMy
experience is limited, but I had had partial success from Spark shell, but
my stand alone program did not even compile. I suspect some jar file is
required.val files =
sc.newAPIHadoopFile("s3://support.elasticmapreduce/spark/examples/lzodataindexed/*.lzo",
classOf[com.hadoop.mapreduce.LzoTextInputFormat],classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.Text])Does
any one know how to do this from a stand alone program?Thanks and regards,



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-LZO-files-with-Spark-tp28106.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Spark driver not reusing HConnection

2016-11-18 Thread Mukesh Jha
Hi

I'm accessing multiple regions (~5k) of an HBase table using spark's
newAPIHadoopRDD. But the driver is trying to calculate the region size of
all the regions.
It is not even reusing the hconnection and creting a new connection for
every request (see below) which is taking lots of time.

Is there a better approach to do this?


8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
[18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
hbase28.cloud.com:2181 sessionTimeout=6
watcher=hconnection-0x1e7824af0x0, quorum=hbase19.cloud.com:2181,
hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
[18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Opening socket connection to server
hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
using SASL (unknown error)
[18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Socket connection established, initiating session, client: /
10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181
[18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Session establishment complete on server
hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
negotiated timeout = 6
[18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: Calculating
region sizes for table "message".
[18 Nov 2016 22:25:27,867] [INFO Driver]
ConnectionManager$HConnectionImplementation: Closing master protocol:
MasterService
[18 Nov 2016 22:25:27,868] [INFO Driver]
ConnectionManager$HConnectionImplementation: Closing zookeeper
sessionid=0x2564f6f013e0e95
[18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
0x2564f6f013e0e95 closed
[18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
EventThread shut down
[18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
[18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
hbase28.cloud.com:2181 sessionTimeout=6
watcher=hconnection-0x6a8a1efa0x0, quorum=hbase19.cloud.com:2181,
hbase24.cloud.com:2181,hbase28.cloud.com:2181, baseZNode=/hbase
[18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Opening socket connection to server
hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
using SASL (unknown error)
[18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Socket connection established, initiating session, client: /
10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181
[18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
ClientCnxn: Session establishment complete on server
hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
negotiated timeout = 6
[18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: Calculating
region sizes for table "message".


-- 
Thanks & Regards,

*Mukesh Jha *


Run spark with hadoop snapshot

2016-11-18 Thread lminer
I'm trying to figure out how to run spark with a snapshot of Hadoop 2.8 that
I built myself. I'm unclear on the configuration needed to get spark to work
with the snapshot.

I'm running spark on mesos. Per the spark documentation, I run spark-submit
as follows using the `spark-2.0.2-bin-without-hadoop`, but spark doesn't
appear to be finding hadoop 2.8.

export SPARK_DIST_CLASSPATH=$(/path/to/hadoop2.8/bin/hadoop classpath)
spark-submit --verbose --master mesos://$MASTER_HOST/mesos

I get the error:

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/hadoop/fs/FSDataInputStream
at
org.apache.spark.deploy.SparkSubmitArguments.handle(SparkSubmitArguments.scala:403)
at
org.apache.spark.launcher.SparkSubmitOptionParser.parse(SparkSubmitOptionParser.java:163)
at
org.apache.spark.deploy.SparkSubmitArguments.(SparkSubmitArguments.scala:98)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.hadoop.fs.FSDataInputStream
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
... 5 more

Any ideas on the proper configuration?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Run-spark-with-hadoop-snapshot-tp28105.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to expose Spark-Shell in the production?

2016-11-18 Thread kant kodali
How to expose Spark-Shell in the production?

1) Should we expose it on Master Nodes or Executer nodes?
2) Should we simple give access to those machines and Spark-Shell binary?
what is the recommended way?

Thanks!


Re: DataFrame select non-existing column

2016-11-18 Thread Kristoffer Sjögren
Thanks for your answer. I have been searching the API for doing that
but I could not find how to do it?

Could you give me a code snippet?

On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf
 wrote:
> You can always add the columns to old dataframes giving them null (or some 
> literal) as a preprocessing.
>
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Friday, November 18, 2016 4:32 PM
> To: user
> Subject: DataFrame select non-existing column
>
> Hi
>
> We have evolved a DataFrame by adding a few columns but cannot write select 
> statements on these columns for older data that doesn't have them since they 
> fail with a AnalysisException with message "No such struct field".
>
> We also tried dropping columns but this doesn't work for nested columns.
>
> Any non-hacky ways to get around this?
>
> Cheers,
> -Kristoffer
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Will spark cache table once even if I call read/cache on the same table multiple times

2016-11-18 Thread Rabin Banerjee
Hi Yong,

  But every time  val tabdf = sqlContext.table(tablename) is called tabdf.rdd
is having a new id which can be checked by calling tabdf.rdd.id .
And,
https://github.com/apache/spark/blob/b6de0c98c70960a97b07615b0b08fbd8f900fbe7/core/src/main/scala/org/apache/spark/SparkContext.scala#L268

Spark is maintaining the Map if [RDD_ID,RDD] , as RDD id is changing , will
spark cache same data again and again ??

For example ,

val tabdf = sqlContext.table("employee")
tabdf.cache()
tabdf.someTransformation.someAction
println(tabledf.rdd.id)
val tabdf1 = sqlContext.table("employee")
tabdf1.cache() <= *Will spark again go to disk read and load data into
memory or look into cache ?*
tabdf1.someTransformation.someAction
println(tabledf1.rdd.id)

Regards,
R Banerjee




On Fri, Nov 18, 2016 at 9:14 PM, Yong Zhang  wrote:

> That's correct, as long as you don't change the StorageLevel.
>
>
> https://github.com/apache/spark/blob/master/core/src/
> main/scala/org/apache/spark/rdd/RDD.scala#L166
>
>
>
> Yong
>
> --
> *From:* Rabin Banerjee 
> *Sent:* Friday, November 18, 2016 10:36 AM
> *To:* user; Mich Talebzadeh; Tathagata Das
> *Subject:* Will spark cache table once even if I call read/cache on the
> same table multiple times
>
> Hi All ,
>
>   I am working in a project where code is divided into multiple reusable
> module . I am not able to understand spark persist/cache on that context.
>
> My Question is Will spark cache table once even if I call read/cache on
> the same table multiple times ??
>
>  Sample Code ::
>
>   TableReader::
>
>def getTableDF(tablename:String,persist:Boolean = false) : DataFrame =
> {
>  val tabdf = sqlContext.table(tablename)
>  if(persist) {
>  tabdf.cache()
> }
>   return tableDF
> }
>
>  Now
> Module1::
>  val emp = TableReader.getTable("employee")
>  emp.someTransformation.someAction
>
> Module2::
>  val emp = TableReader.getTable("employee")
>  emp.someTransformation.someAction
>
> 
>
> ModuleN::
>  val emp = TableReader.getTable("employee")
>  emp.someTransformation.someAction
>
> Will spark cache emp table once , or it will cache every time I am calling
> ?? Shall I maintain a global hashmap to handle that ? something like
> Map[String,DataFrame] ??
>
>  Regards,
> Rabin Banerjee
>
>
>
>


RE: DataFrame select non-existing column

2016-11-18 Thread Mendelson, Assaf
You can always add the columns to old dataframes giving them null (or some 
literal) as a preprocessing.

-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Friday, November 18, 2016 4:32 PM
To: user
Subject: DataFrame select non-existing column

Hi

We have evolved a DataFrame by adding a few columns but cannot write select 
statements on these columns for older data that doesn't have them since they 
fail with a AnalysisException with message "No such struct field".

We also tried dropping columns but this doesn't work for nested columns.

Any non-hacky ways to get around this?

Cheers,
-Kristoffer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Successful streaming with ibm/ mq to flume then to kafka and finally spark streaming

2016-11-18 Thread Mich Talebzadeh
hi,

can someone share their experience of feeding data from ibm/mq messages
into flume, then from flume to kafka and using spark streaming on it?

any issues and things to be aware of?

thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Issue in application deployment on spark cluster

2016-11-18 Thread Asmeet
Hello Anjali,

 According to the documentation at the following URL 

http://spark.apache.org/docs/latest/submitting-applications.html

it says "Currently, standalone mode does not support cluster mode for Python 
applications."
Does this relate to your problem/query.

Regards,
Asmeet

> On 18-Nov-2016, at 8:46 pm, Anjali Gautam  wrote:
> 
> Hello Everybody,
> 
> I am new to Apache spark. I have created an application in Python which works 
> well on spark locally but is not working properly when deployed on standalone 
> spark cluster. 
> 
> Can anybody comment on this behaviour of the application? Also if spark 
> Python code requires making some changes to get working on the cluster. 
> 
> Please let me know if some more information is required.
> 
> Thanks & Regards
> Anjali
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 


Re: How to load only the data of the last partition

2016-11-18 Thread Rabin Banerjee
HI ,

 In order to do that you can write code to read/list a HDFS directory first
, then list its sub-directories . In this way using custom logic ,first
identify the latest year/month/version , then read the avro in that dir in
a DF, then add year/month/version to that DF using withColumn.

Regards,
R Banerjee

On Fri, Nov 18, 2016 at 2:41 PM, Samy Dindane  wrote:

> Thank you Daniel. Unfortunately, we don't use Hive but bare (Avro) files.
>
>
> On 11/17/2016 08:47 PM, Daniel Haviv wrote:
>
>> Hi Samy,
>> If you're working with hive you could create a partitioned table and
>> update it's partitions' locations to the last version so when you'll query
>> it using spark, you'll always get the latest version.
>>
>> Daniel
>>
>> On Thu, Nov 17, 2016 at 9:05 PM, Samy Dindane  s...@dindane.com>> wrote:
>>
>> Hi,
>>
>> I have some data partitioned this way:
>>
>> /data/year=2016/month=9/version=0
>> /data/year=2016/month=10/version=0
>> /data/year=2016/month=10/version=1
>> /data/year=2016/month=10/version=2
>> /data/year=2016/month=10/version=3
>> /data/year=2016/month=11/version=0
>> /data/year=2016/month=11/version=1
>>
>> When using this data, I'd like to load the last version only of each
>> month.
>>
>> A simple way to do this is to do 
>> `load("/data/year=2016/month=11/version=3")`
>> instead of doing `load("/data")`.
>> The drawback of this solution is the loss of partitioning information
>> such as `year` and `month`, which means it would not be possible to apply
>> operations based on the year or the month anymore.
>>
>> Is it possible to ask Spark to load the last version only of each
>> month? How would you go about this?
>>
>> Thank you,
>>
>> Samy
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > user-unsubscr...@spark.apache.org>
>>
>>
>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Will spark cache table once even if I call read/cache on the same table multiple times

2016-11-18 Thread Yong Zhang
That's correct, as long as you don't change the StorageLevel.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L166



Yong


From: Rabin Banerjee 
Sent: Friday, November 18, 2016 10:36 AM
To: user; Mich Talebzadeh; Tathagata Das
Subject: Will spark cache table once even if I call read/cache on the same 
table multiple times

Hi All ,

  I am working in a project where code is divided into multiple reusable module 
. I am not able to understand spark persist/cache on that context.

My Question is Will spark cache table once even if I call read/cache on the 
same table multiple times ??

 Sample Code ::

  TableReader::

   def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = {
 val tabdf = sqlContext.table(tablename)
 if(persist) {
 tabdf.cache()
}
  return tableDF
}

 Now
Module1::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Module2::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction



ModuleN::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Will spark cache emp table once , or it will cache every time I am calling ?? 
Shall I maintain a global hashmap to handle that ? something like 
Map[String,DataFrame] ??

 Regards,
Rabin Banerjee





Re: Long-running job OOMs driver process

2016-11-18 Thread Keith Bourgoin
Thanks for the input. I had read somewhere that s3:// was the way to go due
to some recent changes, but apparently that was outdated. I’m working on
creating some dummy data and a script to process it right now. I’ll post
here with code and logs when I can successfully reproduce the issue on
non-production data.

Yong, that's a good point about the web content. I had forgotten to mention
that when I first saw this a few months ago, on another project, I could
sometimes trigger the OOM by trying to view the web ui for the job. That's
another case I'll try to reproduce.

Thanks again!

Keith.

On Fri, Nov 18, 2016 at 10:30 AM Yong Zhang  wrote:

> Just wondering, is it possible the memory usage keeping going up due to
> the web UI content?
>
>
> Yong
>
>
> --
> *From:* Alexis Seigneurin 
> *Sent:* Friday, November 18, 2016 10:17 AM
> *To:* Nathan Lande
> *Cc:* Keith Bourgoin; Irina Truong; u...@spark.incubator.apache.org
> *Subject:* Re: Long-running job OOMs driver process
>
> +1 for using S3A.
>
> It would also depend on what format you're using. I agree with Steve that
> Parquet, for instance, is a good option. If you're using plain text files,
> some people use GZ files but they cannot be partitioned, thus putting a lot
> of pressure on the driver. It doesn't look like this is the issue you're
> running into, though, because it would not be a progressive slow down, but
> please provide as much detail as possible about your app.
>
> The cache could be an issue but the OOM would come from an executor, not
> from the driver.
>
> From what you're saying, Keith, it indeed looks like some memory is not
> being freed. Seeing the code would help. If you can, also send all the logs
> (with Spark at least in INFO level).
>
> Alexis
>
> On Fri, Nov 18, 2016 at 10:08 AM, Nathan Lande 
> wrote:
>
> +1 to not threading.
>
> What does your load look like? If you are loading many files and cacheing
> them in N rdds rather than 1 rdd this could be an issue.
>
> If the above two things don't fix your oom issue, without knowing anything
> else about your job, I would focus on your cacheing strategy as a potential
> culprit. Try running without any cacheing to isolate the issue; bad
> cacheing strategy is the source of oom issues for me most of the time.
>
> On Nov 18, 2016 6:31 AM, "Keith Bourgoin"  wrote:
>
> Hi Alexis,
>
> Thanks for the response. I've been working with Irina on trying to sort
> this issue out.
>
> We thread the file processing to amortize the cost of things like getting
> files from S3. It's a pattern we've seen recommended in many places, but I
> don't have any of those links handy.  The problem isn't the threading, per
> se, but clearly some sort of memory leak in the driver itself.  Each file
> is a self-contained unit of work, so once it's done all memory related to
> it should be freed. Nothing in the script itself grows over time, so if it
> can do 10 concurrently, it should be able to run like that forever.
>
> I've hit this same issue working on another Spark app which wasn't
> threaded, but produced tens of thousands of jobs. Eventually, the Spark UI
> would get slow, then unresponsive, and then be killed due to OOM.
>
> I'll try to cook up some examples of this today, threaded and not. We were
> hoping that someone had seen this before and it rung a bell. Maybe there's
> a setting to clean up info from old jobs that we can adjust.
>
> Cheers,
>
> Keith.
>
> On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin <
> aseigneu...@ipponusa.com> wrote:
>
> Hi Irina,
>
> I would question the use of multiple threads in your application. Since
> Spark is going to run the processing of each DataFrame on all the cores of
> your cluster, the processes will be competing for resources. In fact, they
> would not only compete for CPU cores but also for memory.
>
> Spark is designed to run your processes in a sequence, and each process
> will be run in a distributed manner (multiple threads on multiple
> instances). I would suggest to follow this principle.
>
> Feel free to share to code if you can. It's always helpful so that we can
> give better advice.
>
> Alexis
>
> On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong  wrote:
>
> We have an application that reads text files, converts them to dataframes,
> and saves them in Parquet format. The application runs fine when processing
> a few files, but we have several thousand produced every day. When running
> the job for all files, we have spark-submit killed on OOM:
>
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing /bin/sh -c "kill -9 27226"...
>
> The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark
> 2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge
> instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores
> 

Re: sort descending with multiple columns

2016-11-18 Thread Rabin Banerjee
++Stuart

val colList = df.columns

can be used

On Fri, Nov 18, 2016 at 8:03 PM, Stuart White 
wrote:

> Is this what you're looking for?
>
> val df = Seq(
>   (1, "A"),
>   (1, "B"),
>   (1, "C"),
>   (2, "D"),
>   (3, "E")
> ).toDF("foo", "bar")
>
> val colList = Seq("foo", "bar")
> df.sort(colList.map(col(_).desc): _*).show
>
> +---+---+
> |foo|bar|
> +---+---+
> |  3|  E|
> |  2|  D|
> |  1|  C|
> |  1|  B|
> |  1|  A|
> +---+---+
>
> On Fri, Nov 18, 2016 at 1:15 AM, Sreekanth Jella 
> wrote:
> > Hi,
> >
> > I'm trying to sort multiple columns and column names are dynamic.
> >
> > df.sort(colList.head, colList.tail: _*)
> >
> >
> > But I'm not sure how to sort in descending order for all columns, I tried
> > this but it's for only first column..
> >
> > df.sort(df.col(colList.head).desc)
> > How can I pass all column names (or some) with descending order.
> >
> >
> > Thanks,
> > Sreekanth
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Will spark cache table once even if I call read/cache on the same table multiple times

2016-11-18 Thread Rabin Banerjee
Hi All ,

  I am working in a project where code is divided into multiple reusable
module . I am not able to understand spark persist/cache on that context.

My Question is Will spark cache table once even if I call read/cache on the
same table multiple times ??

 Sample Code ::

  TableReader::

   def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = {
 val tabdf = sqlContext.table(tablename)
 if(persist) {
 tabdf.cache()
}
  return tableDF
}

 Now
Module1::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Module2::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction



ModuleN::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Will spark cache emp table once , or it will cache every time I am calling
?? Shall I maintain a global hashmap to handle that ? something like
Map[String,DataFrame] ??

 Regards,
Rabin Banerjee


Re: Long-running job OOMs driver process

2016-11-18 Thread Yong Zhang
Just wondering, is it possible the memory usage keeping going up due to the web 
UI content?


Yong



From: Alexis Seigneurin 
Sent: Friday, November 18, 2016 10:17 AM
To: Nathan Lande
Cc: Keith Bourgoin; Irina Truong; u...@spark.incubator.apache.org
Subject: Re: Long-running job OOMs driver process

+1 for using S3A.

It would also depend on what format you're using. I agree with Steve that 
Parquet, for instance, is a good option. If you're using plain text files, some 
people use GZ files but they cannot be partitioned, thus putting a lot of 
pressure on the driver. It doesn't look like this is the issue you're running 
into, though, because it would not be a progressive slow down, but please 
provide as much detail as possible about your app.

The cache could be an issue but the OOM would come from an executor, not from 
the driver.

>From what you're saying, Keith, it indeed looks like some memory is not being 
>freed. Seeing the code would help. If you can, also send all the logs (with 
>Spark at least in INFO level).

Alexis

On Fri, Nov 18, 2016 at 10:08 AM, Nathan Lande 
> wrote:

+1 to not threading.

What does your load look like? If you are loading many files and cacheing them 
in N rdds rather than 1 rdd this could be an issue.

If the above two things don't fix your oom issue, without knowing anything else 
about your job, I would focus on your cacheing strategy as a potential culprit. 
Try running without any cacheing to isolate the issue; bad cacheing strategy is 
the source of oom issues for me most of the time.

On Nov 18, 2016 6:31 AM, "Keith Bourgoin" 
> wrote:
Hi Alexis,

Thanks for the response. I've been working with Irina on trying to sort this 
issue out.

We thread the file processing to amortize the cost of things like getting files 
from S3. It's a pattern we've seen recommended in many places, but I don't have 
any of those links handy.  The problem isn't the threading, per se, but clearly 
some sort of memory leak in the driver itself.  Each file is a self-contained 
unit of work, so once it's done all memory related to it should be freed. 
Nothing in the script itself grows over time, so if it can do 10 concurrently, 
it should be able to run like that forever.

I've hit this same issue working on another Spark app which wasn't threaded, 
but produced tens of thousands of jobs. Eventually, the Spark UI would get 
slow, then unresponsive, and then be killed due to OOM.

I'll try to cook up some examples of this today, threaded and not. We were 
hoping that someone had seen this before and it rung a bell. Maybe there's a 
setting to clean up info from old jobs that we can adjust.

Cheers,

Keith.

On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin 
> wrote:
Hi Irina,

I would question the use of multiple threads in your application. Since Spark 
is going to run the processing of each DataFrame on all the cores of your 
cluster, the processes will be competing for resources. In fact, they would not 
only compete for CPU cores but also for memory.

Spark is designed to run your processes in a sequence, and each process will be 
run in a distributed manner (multiple threads on multiple instances). I would 
suggest to follow this principle.

Feel free to share to code if you can. It's always helpful so that we can give 
better advice.

Alexis

On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong 
> wrote:

We have an application that reads text files, converts them to dataframes, and 
saves them in Parquet format. The application runs fine when processing a few 
files, but we have several thousand produced every day. When running the job 
for all files, we have spark-submit killed on OOM:


#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing /bin/sh -c "kill -9 27226"...


The job is written in Python. We're running it in Amazon EMR 5.0 (Spark 2.0.0) 
with spark-submit. We're using a cluster with a master c3.2xlarge instance (8 
cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores and 30g of RAM 
each). Spark config settings are as follows:


('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),

('spark.executors.instances', '3'),

('spark.yarn.executor.memoryOverhead', '9g'),

('spark.executor.cores', '15'),

('spark.executor.memory', '12g'),

('spark.scheduler.mode', 'FIFO'),

('spark.cleaner.ttl', '1800'),


The job processes each file in a thread, and we have 10 threads running 
concurrently. The process will OOM after about 4 hours, at which point Spark 
has processed over 20,000 jobs.

It seems like the driver is running out of memory, but each individual job is 
quite small. Are there any known memory leaks for long-running Spark 
applications on Yarn?




Re: Long-running job OOMs driver process

2016-11-18 Thread Alexis Seigneurin
+1 for using S3A.

It would also depend on what format you're using. I agree with Steve that
Parquet, for instance, is a good option. If you're using plain text files,
some people use GZ files but they cannot be partitioned, thus putting a lot
of pressure on the driver. It doesn't look like this is the issue you're
running into, though, because it would not be a progressive slow down, but
please provide as much detail as possible about your app.

The cache could be an issue but the OOM would come from an executor, not
from the driver.

>From what you're saying, Keith, it indeed looks like some memory is not
being freed. Seeing the code would help. If you can, also send all the logs
(with Spark at least in INFO level).

Alexis

On Fri, Nov 18, 2016 at 10:08 AM, Nathan Lande 
wrote:

> +1 to not threading.
>
> What does your load look like? If you are loading many files and cacheing
> them in N rdds rather than 1 rdd this could be an issue.
>
> If the above two things don't fix your oom issue, without knowing anything
> else about your job, I would focus on your cacheing strategy as a potential
> culprit. Try running without any cacheing to isolate the issue; bad
> cacheing strategy is the source of oom issues for me most of the time.
>
> On Nov 18, 2016 6:31 AM, "Keith Bourgoin"  wrote:
>
>> Hi Alexis,
>>
>> Thanks for the response. I've been working with Irina on trying to sort
>> this issue out.
>>
>> We thread the file processing to amortize the cost of things like getting
>> files from S3. It's a pattern we've seen recommended in many places, but I
>> don't have any of those links handy.  The problem isn't the threading, per
>> se, but clearly some sort of memory leak in the driver itself.  Each file
>> is a self-contained unit of work, so once it's done all memory related to
>> it should be freed. Nothing in the script itself grows over time, so if it
>> can do 10 concurrently, it should be able to run like that forever.
>>
>> I've hit this same issue working on another Spark app which wasn't
>> threaded, but produced tens of thousands of jobs. Eventually, the Spark UI
>> would get slow, then unresponsive, and then be killed due to OOM.
>>
>> I'll try to cook up some examples of this today, threaded and not. We
>> were hoping that someone had seen this before and it rung a bell. Maybe
>> there's a setting to clean up info from old jobs that we can adjust.
>>
>> Cheers,
>>
>> Keith.
>>
>> On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin <
>> aseigneu...@ipponusa.com> wrote:
>>
>>> Hi Irina,
>>>
>>> I would question the use of multiple threads in your application. Since
>>> Spark is going to run the processing of each DataFrame on all the cores of
>>> your cluster, the processes will be competing for resources. In fact, they
>>> would not only compete for CPU cores but also for memory.
>>>
>>> Spark is designed to run your processes in a sequence, and each process
>>> will be run in a distributed manner (multiple threads on multiple
>>> instances). I would suggest to follow this principle.
>>>
>>> Feel free to share to code if you can. It's always helpful so that we
>>> can give better advice.
>>>
>>> Alexis
>>>
>>> On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong  wrote:
>>>
>>> We have an application that reads text files, converts them to
>>> dataframes, and saves them in Parquet format. The application runs fine
>>> when processing a few files, but we have several thousand produced every
>>> day. When running the job for all files, we have spark-submit killed on OOM:
>>>
>>> #
>>> # java.lang.OutOfMemoryError: Java heap space
>>> # -XX:OnOutOfMemoryError="kill -9 %p"
>>> #   Executing /bin/sh -c "kill -9 27226"...
>>>
>>> The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark
>>> 2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge
>>> instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores
>>> and 30g of RAM each). Spark config settings are as follows:
>>>
>>> ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
>>>
>>> ('spark.executors.instances', '3'),
>>>
>>> ('spark.yarn.executor.memoryOverhead', '9g'),
>>>
>>> ('spark.executor.cores', '15'),
>>>
>>> ('spark.executor.memory', '12g'),
>>>
>>> ('spark.scheduler.mode', 'FIFO'),
>>>
>>> ('spark.cleaner.ttl', '1800'),
>>>
>>> The job processes each file in a thread, and we have 10 threads running
>>> concurrently. The process will OOM after about 4 hours, at which point
>>> Spark has processed over 20,000 jobs.
>>>
>>> It seems like the driver is running out of memory, but each individual
>>> job is quite small. Are there any known memory leaks for long-running Spark
>>> applications on Yarn?
>>>
>>>
>>>
>>>
>>> --
>>>
>>> *Alexis Seigneurin*
>>> *Managing Consultant*
>>> (202) 459-1591 <202%20459.1591> - LinkedIn
>>> 
>>>
>>> 
>>> Rate our service 

Issue in application deployment on spark cluster

2016-11-18 Thread Anjali Gautam
Hello Everybody,

I am new to Apache spark. I have created an application in Python which works 
well on spark locally but is not working properly when deployed on standalone 
spark cluster. 

Can anybody comment on this behaviour of the application? Also if spark Python 
code requires making some changes to get working on the cluster. 

Please let me know if some more information is required.

Thanks & Regards
Anjali
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Long-running job OOMs driver process

2016-11-18 Thread Nathan Lande
+1 to not threading.

What does your load look like? If you are loading many files and cacheing
them in N rdds rather than 1 rdd this could be an issue.

If the above two things don't fix your oom issue, without knowing anything
else about your job, I would focus on your cacheing strategy as a potential
culprit. Try running without any cacheing to isolate the issue; bad
cacheing strategy is the source of oom issues for me most of the time.

On Nov 18, 2016 6:31 AM, "Keith Bourgoin"  wrote:

> Hi Alexis,
>
> Thanks for the response. I've been working with Irina on trying to sort
> this issue out.
>
> We thread the file processing to amortize the cost of things like getting
> files from S3. It's a pattern we've seen recommended in many places, but I
> don't have any of those links handy.  The problem isn't the threading, per
> se, but clearly some sort of memory leak in the driver itself.  Each file
> is a self-contained unit of work, so once it's done all memory related to
> it should be freed. Nothing in the script itself grows over time, so if it
> can do 10 concurrently, it should be able to run like that forever.
>
> I've hit this same issue working on another Spark app which wasn't
> threaded, but produced tens of thousands of jobs. Eventually, the Spark UI
> would get slow, then unresponsive, and then be killed due to OOM.
>
> I'll try to cook up some examples of this today, threaded and not. We were
> hoping that someone had seen this before and it rung a bell. Maybe there's
> a setting to clean up info from old jobs that we can adjust.
>
> Cheers,
>
> Keith.
>
> On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin <
> aseigneu...@ipponusa.com> wrote:
>
>> Hi Irina,
>>
>> I would question the use of multiple threads in your application. Since
>> Spark is going to run the processing of each DataFrame on all the cores of
>> your cluster, the processes will be competing for resources. In fact, they
>> would not only compete for CPU cores but also for memory.
>>
>> Spark is designed to run your processes in a sequence, and each process
>> will be run in a distributed manner (multiple threads on multiple
>> instances). I would suggest to follow this principle.
>>
>> Feel free to share to code if you can. It's always helpful so that we can
>> give better advice.
>>
>> Alexis
>>
>> On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong  wrote:
>>
>> We have an application that reads text files, converts them to
>> dataframes, and saves them in Parquet format. The application runs fine
>> when processing a few files, but we have several thousand produced every
>> day. When running the job for all files, we have spark-submit killed on OOM:
>>
>> #
>> # java.lang.OutOfMemoryError: Java heap space
>> # -XX:OnOutOfMemoryError="kill -9 %p"
>> #   Executing /bin/sh -c "kill -9 27226"...
>>
>> The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark
>> 2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge
>> instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores
>> and 30g of RAM each). Spark config settings are as follows:
>>
>> ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
>>
>> ('spark.executors.instances', '3'),
>>
>> ('spark.yarn.executor.memoryOverhead', '9g'),
>>
>> ('spark.executor.cores', '15'),
>>
>> ('spark.executor.memory', '12g'),
>>
>> ('spark.scheduler.mode', 'FIFO'),
>>
>> ('spark.cleaner.ttl', '1800'),
>>
>> The job processes each file in a thread, and we have 10 threads running
>> concurrently. The process will OOM after about 4 hours, at which point
>> Spark has processed over 20,000 jobs.
>>
>> It seems like the driver is running out of memory, but each individual
>> job is quite small. Are there any known memory leaks for long-running Spark
>> applications on Yarn?
>>
>>
>>
>>
>> --
>>
>> *Alexis Seigneurin*
>> *Managing Consultant*
>> (202) 459-1591 <202%20459.1591> - LinkedIn
>> 
>>
>> 
>> Rate our service 
>>
>


Re: Long-running job OOMs driver process

2016-11-18 Thread Steve Loughran

On 18 Nov 2016, at 14:31, Keith Bourgoin 
> wrote:

We thread the file processing to amortize the cost of things like getting files 
from S3.

Define cost here: actual $ amount, or merely time to read the data?

If it's read times, you should really be trying the new stuff coming in the 
hadoop-2.8+ s3a client, which has put a lot of work into higher performance 
reading of ORC & Parquet data, plus general improvements in listing/opening, 
etc, trying to cut down on slow metadata queries. You are still going to have 
delays of tens to hundreds of millis on every HTTP request (bigger ones for DNS 
problems and/or s3 load balancer overload), but once open, seek + read of s3 
data will be much faster (not end-to-end read of an s3 file though, that's just 
bandwidth limitation after the HTTPS negotiation).

http://www.slideshare.net/steve_l/hadoop-hive-spark-and-object-stores

Also, do make sure you are using s3a URLs, if you weren't already

-Steve


Re: Any with S3 experience with Spark? Having ListBucket issues

2016-11-18 Thread Steve Loughran

On 16 Nov 2016, at 22:34, Edden Burrow 
> wrote:

Anyone dealing with a lot of files with spark?  We're trying s3a with 2.0.1 
because we're seeing intermittent errors in S3 where jobs fail and saveAsText 
file fails. Using pyspark.

How many files? Thousands? Millions?

If you do have some big/complex file structure, I'd really like to know; it not 
only helps us make sure that spark/hive metastore/s3a can handle the layout, it 
may help improve some advice on what not to do.


Is there any issue with working in a S3 folder that has too many files?  How 
about having versioning enabled? Are these things going to be a problem?

Many, many files shouldn't be a problem, except for slowing down some 
operations, and creating larger memory structures to get passed round. 
Partitioning can get slow.


We're pre-building the s3 file list and storing it in a file and passing that 
to textFile as a long comma separated list of files - So we are not running 
list files.

But we get errors with saveAsText file, related to ListBucket.  Even though 
we're not using wildcard '*'.

org.apache.hadoop.fs.s3.S3Exception: org.jets3t.service.S3ServiceException: 
Failed to parse XML document with handler class 
org.jets3t.service.impl.rest.XmlResponsesSaxParser$ListBucketHandler


at a guess, it'll be some checks before the write that the parent directory 
exists and the destination path isn't a directory.


Running spark 2.0.1 with the s3a protocol.

Not with a stack trace containing org.jets3t you aren't. That's what you'd 
expect for s3 and s3n; the key feature of s3a is moving onto the amazon SDK, 
where stack traces move to com.amazon classes

Make sure you *are* using s3a, ideally on Hadoop 2.7.x  (or even better, HDP 
2.5 where you get all the Hadoop 2.8 read pipeline optimisations) On Hadoop 
2.6.x there were still some stabilisation issues that only surfaced in the wild.

Some related slides 
http://www.slideshare.net/steve_l/apache-spark-and-object-stores

-Steve


Re: sort descending with multiple columns

2016-11-18 Thread Stuart White
Is this what you're looking for?

val df = Seq(
  (1, "A"),
  (1, "B"),
  (1, "C"),
  (2, "D"),
  (3, "E")
).toDF("foo", "bar")

val colList = Seq("foo", "bar")
df.sort(colList.map(col(_).desc): _*).show

+---+---+
|foo|bar|
+---+---+
|  3|  E|
|  2|  D|
|  1|  C|
|  1|  B|
|  1|  A|
+---+---+

On Fri, Nov 18, 2016 at 1:15 AM, Sreekanth Jella  wrote:
> Hi,
>
> I'm trying to sort multiple columns and column names are dynamic.
>
> df.sort(colList.head, colList.tail: _*)
>
>
> But I'm not sure how to sort in descending order for all columns, I tried
> this but it's for only first column..
>
> df.sort(df.col(colList.head).desc)
> How can I pass all column names (or some) with descending order.
>
>
> Thanks,
> Sreekanth

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



DataFrame select non-existing column

2016-11-18 Thread Kristoffer Sjögren
Hi

We have evolved a DataFrame by adding a few columns but cannot write
select statements on these columns for older data that doesn't have
them since they fail with a AnalysisException with message "No such
struct field".

We also tried dropping columns but this doesn't work for nested columns.

Any non-hacky ways to get around this?

Cheers,
-Kristoffer

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Long-running job OOMs driver process

2016-11-18 Thread Keith Bourgoin
Hi Alexis,

Thanks for the response. I've been working with Irina on trying to sort
this issue out.

We thread the file processing to amortize the cost of things like getting
files from S3. It's a pattern we've seen recommended in many places, but I
don't have any of those links handy.  The problem isn't the threading, per
se, but clearly some sort of memory leak in the driver itself.  Each file
is a self-contained unit of work, so once it's done all memory related to
it should be freed. Nothing in the script itself grows over time, so if it
can do 10 concurrently, it should be able to run like that forever.

I've hit this same issue working on another Spark app which wasn't
threaded, but produced tens of thousands of jobs. Eventually, the Spark UI
would get slow, then unresponsive, and then be killed due to OOM.

I'll try to cook up some examples of this today, threaded and not. We were
hoping that someone had seen this before and it rung a bell. Maybe there's
a setting to clean up info from old jobs that we can adjust.

Cheers,

Keith.

On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin 
wrote:

> Hi Irina,
>
> I would question the use of multiple threads in your application. Since
> Spark is going to run the processing of each DataFrame on all the cores of
> your cluster, the processes will be competing for resources. In fact, they
> would not only compete for CPU cores but also for memory.
>
> Spark is designed to run your processes in a sequence, and each process
> will be run in a distributed manner (multiple threads on multiple
> instances). I would suggest to follow this principle.
>
> Feel free to share to code if you can. It's always helpful so that we can
> give better advice.
>
> Alexis
>
> On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong  wrote:
>
> We have an application that reads text files, converts them to dataframes,
> and saves them in Parquet format. The application runs fine when processing
> a few files, but we have several thousand produced every day. When running
> the job for all files, we have spark-submit killed on OOM:
>
> #
> # java.lang.OutOfMemoryError: Java heap space
> # -XX:OnOutOfMemoryError="kill -9 %p"
> #   Executing /bin/sh -c "kill -9 27226"...
>
> The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark
> 2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge
> instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores
> and 30g of RAM each). Spark config settings are as follows:
>
> ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
>
> ('spark.executors.instances', '3'),
>
> ('spark.yarn.executor.memoryOverhead', '9g'),
>
> ('spark.executor.cores', '15'),
>
> ('spark.executor.memory', '12g'),
>
> ('spark.scheduler.mode', 'FIFO'),
>
> ('spark.cleaner.ttl', '1800'),
>
> The job processes each file in a thread, and we have 10 threads running
> concurrently. The process will OOM after about 4 hours, at which point
> Spark has processed over 20,000 jobs.
>
> It seems like the driver is running out of memory, but each individual job
> is quite small. Are there any known memory leaks for long-running Spark
> applications on Yarn?
>
>
>
>
> --
>
> *Alexis Seigneurin*
> *Managing Consultant*
> (202) 459-1591 <202%20459.1591> - LinkedIn
> 
>
> 
> Rate our service 
>


Sporadic ClassNotFoundException with Kryo

2016-11-18 Thread chrism
Regardless of the different ways we have tried deploying a jar together with
Spark, when running a Spark Streaming job with Kryo as serializer on top of
Mesos, we sporadically get the following error (I have truncated a bit):

/16/11/18 08:39:10 ERROR OneForOneBlockFetcher: Failed while starting block
fetches
java.lang.RuntimeException: org.apache.spark.SparkException: Failed to
register classes with Kryo
  at
org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:129)
  at
org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:274)
...
  at
org.apache.spark.serializer.SerializerManager.dataSerializeStream(SerializerManager.scala:125)
  at
org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1265)
  at
org.apache.spark.storage.BlockManager$$anonfun$dropFromMemory$3.apply(BlockManager.scala:1261)
...
Caused by: java.lang.ClassNotFoundException: cics.udr.compound_ran_udr
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)/

where "cics.udr.compound_ran_udr" is a class provided by us in a jar.

We know that the jar containing "cics.udr.compound_ran_udr" is being
deployed and works because it is listed in the "Environment" tab in the GUI,
and calculations using this class succeed.

We have tried the following methods of deploying the jar containing the
class
 * Through --jars in spark-submit
 * Through SparkConf.setJar
 * Through spark.driver.extraClassPath and spark.executor.extraClassPath
 * By having it as the main jar used by spark-submit
with no luck. The logs (see attached) recognize that the jar is being added
to the classloader.

We have tried registering the class using
 * SparkConf.registerKryoClasses.
 * spark.kryo.classesToRegister
with no luck.

We are running on Mesos and the jar has been deployed on every machine on
the local file system in the same location.

I would be very grateful for any help or ideas :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Sporadic-ClassNotFoundException-with-Kryo-tp28104.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: CSV to parquet preserving partitioning

2016-11-18 Thread benoitdr
This is more or less how I'm doing it now.
Problem is that it creates shuffling in the cluster because the input data
are not collocated according to the partition scheme.

If a reload the output parquet files as a new dataframe, then everything is
fine, but I'd like to avoid shuffling also during the ETL phase.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CSV-to-parquet-preserving-partitioning-tp28078p28103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Kafka direct approach,App UI shows wrong input rate

2016-11-18 Thread Julian Keppel
Hello,

I use Spark 2.0.2 with Kafka integration 0-8. The Kafka version is 0.10.0.1
(Scala 2.11). I read data from Kafka with the direct approach. The complete
infrastructure runs on Google Container Engine.

I wonder why the corresponding application UI says the input rate is zero
records per second. This is definitely wrong. I checked it while I printed
out the incoming records to the driver console. All other metrics seem to
be correct (at least they are realistic).

What is going on here? Do you have any idea? Thanks for you help.

Julian


Re: How do I flatten JSON blobs into a Data Frame using Spark/Spark SQL

2016-11-18 Thread kant kodali
This seem to work

import org.apache.spark.sql._
val rdd = df2.rdd.map { case Row(j: String) => j }
spark.read.json(rdd).show()

However I wonder if this any inefficiency here ? since I have to apply this
function for billion rows.


Re: How to load only the data of the last partition

2016-11-18 Thread Samy Dindane

Thank you Daniel. Unfortunately, we don't use Hive but bare (Avro) files.


On 11/17/2016 08:47 PM, Daniel Haviv wrote:

Hi Samy,
If you're working with hive you could create a partitioned table and update 
it's partitions' locations to the last version so when you'll query it using 
spark, you'll always get the latest version.

Daniel

On Thu, Nov 17, 2016 at 9:05 PM, Samy Dindane > wrote:

Hi,

I have some data partitioned this way:

/data/year=2016/month=9/version=0
/data/year=2016/month=10/version=0
/data/year=2016/month=10/version=1
/data/year=2016/month=10/version=2
/data/year=2016/month=10/version=3
/data/year=2016/month=11/version=0
/data/year=2016/month=11/version=1

When using this data, I'd like to load the last version only of each month.

A simple way to do this is to do `load("/data/year=2016/month=11/version=3")` instead 
of doing `load("/data")`.
The drawback of this solution is the loss of partitioning information such 
as `year` and `month`, which means it would not be possible to apply operations 
based on the year or the month anymore.

Is it possible to ask Spark to load the last version only of each month? 
How would you go about this?

Thank you,

Samy

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org 





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0

2016-11-18 Thread Phillip Henry
Looks like a standard "not enough memory" issue. I can only recommend the
usual advice of increasing the number of partitions to give you a quick-win.

Also, your JVMs have an enormous amount of memory. This may cause long GC
pause times. You might like to try reducing the memory to about 20gb and
having 10x as many executors.

Finally, you might want to monitor how many file handles your user is able
to exploit. On a *Nix system use the ulimit command to see what the
restriction (if any) is.

Phill


On Wed, Aug 10, 2016 at 8:34 PM, شجاع الرحمن بیگ 
wrote:

> Hi,
>
> I am getting following error while processing large input size.
>
> ...
> [Stage 18:> (90 + 24)
> / 240]16/08/10 19:39:54 WARN TaskSetManager: Lost task 86.1 in stage 18.0
> (TID 2517, bscpower8n2-data): FetchFailed(null, shuffleId=0, mapId=-1,
> reduceId=86, message=
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0
> at org.apache.spark.MapOutputTracker$$anonfun$org$
> apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(
> MapOutputTracker.scala:542)
> at org.apache.spark.MapOutputTracker$$anonfun$org$
> apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(
> MapOutputTracker.scala:538)
> at scala.collection.TraversableLike$WithFilter$$
> anonfun$foreach$1.apply(TraversableLike.scala:772)
> at scala.collection.IndexedSeqOptimized$class.
> foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at scala.collection.TraversableLike$WithFilter.
> foreach(TraversableLike.scala:771)
> at org.apache.spark.MapOutputTracker$.org$apache$
> spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
> at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(
> MapOutputTracker.scala:155)
> at org.apache.spark.shuffle.BlockStoreShuffleReader.read(
> BlockStoreShuffleReader.scala:47)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at org.apache.spark.graphx.EdgeRDD.compute(EdgeRDD.scala:51)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(
> ZippedPartitionsRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(
> ZippedPartitionsRDD.scala:88)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(
> MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at org.apache.spark.executor.Executor$TaskRunner.run(
> Executor.scala:214)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> ...
>
> The specification are as follow.
>
> Spark version. 1.6.1
>
> Cluster Mode= Standalone
> Storage level: Memory and Disk
>
> Spark Worker cores=6
>
> spark worker memory=200gb
>
> spark executor memory = 199gb
> spark driver memory = 5gb
>
> Number of input partitions=240
>
> input data set =34 GB
>
>
>
>
>
>
>
> I investigated the issue further and monitor the free ram using vmstat
> during the execution of workload and it reveals that the job keep running
> successfully until free memory available but start throwing exception on
> ending up the free memory.
>
>
>
> Is anyone face