Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

2016-05-11 Thread James Hammerton
This may be related to: https://issues.apache.org/jira/browse/SPARK-13773

Regards,

James

On 11 May 2016 at 15:49, Ted Yu  wrote:

> In master branch, behavior is the same.
>
> Suggest opening a JIRA if you haven't done so.
>
> On Wed, May 11, 2016 at 6:55 AM, Tony Jin  wrote:
>
>> Hi guys,
>>
>> I have a problem about spark DataFrame. My spark version is 1.6.1.
>> Basically, i used udf and df.withColumn to create a "new" column, and
>> then i filter the values on this new columns and call show(action). I see
>> the udf function (which is used to by withColumn to create the new column)
>> is called twice(duplicated). And if filter on "old" column, udf only run
>> once which is expected. I attached the example codes, line 30~38 shows the
>> problem.
>>
>>  Anyone knows the internal reason? Can you give me any advices? Thank you
>> very much.
>>
>>
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> 41
>> 42
>> 43
>> 44
>> 45
>> 46
>> 47
>>
>> scala> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.functions._
>>
>> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", 
>> "b1"))).toDF("old","old1")
>> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>>
>> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
>> udfFunc: org.apache.spark.sql.UserDefinedFunction = 
>> UserDefinedFunction(,StringType,List(StringType))
>>
>> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
>> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: 
>> string]
>>
>> scala> newDF.show
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> | a1|  b1| a1|
>> +---++---+
>>
>>
>> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
>> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
>> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: 
>> string, new: string]
>>
>> scala> filteredOnNewColumnDF.show
>> running udf(a)
>> running udf(a)
>> running udf(a1)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>> scala> filteredOnOldColumnDF.show
>> running udf(a)
>> +---++---+
>> |old|old1|new|
>> +---++---+
>> |  a|   b|  a|
>> +---++---+
>>
>>
>>
>> Best wishes.
>> By Linbo
>>
>>
>


Re: Error from reading S3 in Scala

2016-05-04 Thread James Hammerton
On 3 May 2016 at 17:22, Gourav Sengupta  wrote:

> Hi,
>
> The best thing to do is start the EMR clusters with proper permissions in
> the roles that way you do not need to worry about the keys at all.
>
> Another thing, why are we using s3a// instead of s3:// ?
>

Probably because of what's said about s3:// and s3n:// here (which is why I
use s3a://):

https://wiki.apache.org/hadoop/AmazonS3

Regards,

James


> Besides that you can increase s3 speeds using the instructions mentioned
> here:
> https://aws.amazon.com/blogs/aws/aws-storage-update-amazon-s3-transfer-acceleration-larger-snowballs-in-more-regions/
>
>
> Regards,
> Gourav
>
> On Tue, May 3, 2016 at 12:04 PM, Steve Loughran 
> wrote:
>
>> don't put your secret in the URI, it'll only creep out in the logs.
>>
>> Use the specific properties coverd in
>> http://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html,
>> which you can set in your spark context by prefixing them with spark.hadoop.
>>
>> you can also set the env vars, AWS_ACCESS_KEY_ID and
>> AWS_SECRET_ACCESS_KEY; SparkEnv will pick these up and set the relevant
>> spark context keys for you
>>
>>
>> On 3 May 2016, at 01:53, Zhang, Jingyu  wrote:
>>
>> Hi All,
>>
>> I am using Eclipse with Maven for developing Spark applications. I got a
>> error for Reading from S3 in Scala but it works fine in Java when I run
>> them in the same project in Eclipse. The Scala/Java code and the error in
>> following
>>
>>
>> Scala
>>
>> val uri = URI.create("s3a://" + key + ":" + seckey + "@" +
>> "graphclustering/config.properties");
>> val pt = new Path("s3a://" + key + ":" + seckey + "@" +
>> "graphclustering/config.properties");
>> val fs = FileSystem.get(uri,ctx.hadoopConfiguration);
>> val  inputStream:InputStream = fs.open(pt);
>>
>> Exception: on aws-java-1.7.4 and hadoop-aws-2.6.1
>>
>> Exception in thread "main"
>> com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service:
>> Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID:
>> 8A56DC7BF0BFF09A), S3 Extended Request ID
>>
>> at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(
>> AmazonHttpClient.java:1160)
>>
>> at com.amazonaws.http.AmazonHttpClient.executeOneRequest(
>> AmazonHttpClient.java:748)
>>
>> at com.amazonaws.http.AmazonHttpClient.executeHelper(
>> AmazonHttpClient.java:467)
>>
>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302)
>>
>> at com.amazonaws.services.s3.AmazonS3Client.invoke(
>> AmazonS3Client.java:3785)
>>
>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
>> AmazonS3Client.java:1050)
>>
>> at com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(
>> AmazonS3Client.java:1027)
>>
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(
>> S3AFileSystem.java:688)
>>
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:222)
>>
>> at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
>>
>> at com.news.report.graph.GraphCluster$.main(GraphCluster.scala:53)
>>
>> at com.news.report.graph.GraphCluster.main(GraphCluster.scala)
>>
>> 16/05/03 10:49:17 INFO SparkContext: Invoking stop() from shutdown hook
>>
>> 16/05/03 10:49:17 INFO SparkUI: Stopped Spark web UI at
>> http://10.65.80.125:4040
>>
>> 16/05/03 10:49:17 INFO MapOutputTrackerMasterEndpoint:
>> MapOutputTrackerMasterEndpoint stopped!
>>
>> 16/05/03 10:49:17 INFO MemoryStore: MemoryStore cleared
>>
>> 16/05/03 10:49:17 INFO BlockManager: BlockManager stopped
>>
>> Exception: on aws-java-1.7.4 and hadoop-aws-2.7.2
>>
>> 16/05/03 10:23:40 INFO Slf4jLogger: Slf4jLogger started
>>
>> 16/05/03 10:23:40 INFO Remoting: Starting remoting
>>
>> 16/05/03 10:23:40 INFO Remoting: Remoting started; listening on addresses
>> :[akka.tcp://sparkDriverActorSystem@10.65.80.125:61860]
>>
>> 16/05/03 10:23:40 INFO Utils: Successfully started service
>> 'sparkDriverActorSystem' on port 61860.
>>
>> 16/05/03 10:23:40 INFO SparkEnv: Registering MapOutputTracker
>>
>> 16/05/03 10:23:40 INFO SparkEnv: Registering BlockManagerMaster
>>
>> 16/05/03 10:23:40 INFO DiskBlockManager: Created local directory at
>> /private/var/folders/sc/tdmkbvr1705b8p70xqj1kqks5l9p
>>
>> 16/05/03 10:23:40 INFO MemoryStore: MemoryStore started with capacity
>> 1140.4 MB
>>
>> 16/05/03 10:23:40 INFO SparkEnv: Registering OutputCommitCoordinator
>>
>> 16/05/03 10:23:40 INFO Utils: Successfully started service 'SparkUI' on
>> port 4040.
>>
>> 16/05/03 10:23:40 INFO SparkUI: Started SparkUI at
>> http://10.65.80.125:4040
>>
>> 16/05/03 10:23:40 INFO Executor: Starting executor ID driver on host
>> localhost
>>
>> 16/05/03 10:23:40 INFO Utils: Successfully started service
>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 61861.
>>
>> 16/05/03 10:23:40 INFO NettyBlockTransferService: Server created on 61861
>>
>> 16/05/03 10:23:40 INFO BlockManagerMaster: Trying to register BlockManager
>>
>> 16/05/03 10:23:40 INFO BlockManagerMasterEndpoint: Registering block
>> m

Re: ML Random Forest Classifier

2016-04-13 Thread James Hammerton
Hi Ashic,

Unfortunately I don't know how to work around that - I suggested this line
as it looked promising (I had considered it once before deciding to use a
different algorithm) but I never actually tried it.

Regards,

James

On 13 April 2016 at 02:29, Ashic Mahtab  wrote:

> It looks like the issue is around impurity stats. After converting an rf
> model to old, and back to new (without disk storage or anything), and
> specifying the same num of features, same categorical features map, etc.,
> DecisionTreeClassifier::predictRaw throws a null pointer exception here:
>
>  override protected def predictRaw(features: Vector): Vector = {
> Vectors.dense(rootNode.predictImpl(features).*impurityStats.*
> stats.clone())
>   }
>
> It appears impurityStats is always null (even though impurity does have a
> value). Any known workarounds? It's looking like I'll have to revert to
> using mllib instead :(
>
> -Ashic.
>
> --
> From: as...@live.com
> To: ja...@gluru.co
> CC: user@spark.apache.org
> Subject: RE: ML Random Forest Classifier
> Date: Wed, 13 Apr 2016 02:20:53 +0100
>
>
> I managed to get to the map using MetadataUtils (it's a private ml
> package). There are still some issues, around feature names, etc. Trying to
> pin them down.
>
> --
> From: as...@live.com
> To: ja...@gluru.co
> CC: user@spark.apache.org
> Subject: RE: ML Random Forest Classifier
> Date: Wed, 13 Apr 2016 00:50:31 +0100
>
> Hi James,
> Following on from the previous email, is there a way to get the
> categoricalFeatures of a Spark ML Random Forest? Essentially something I
> can pass to
>
> RandomForestClassificationModel.fromOld(oldModel, parent,
> *categoricalFeatures*, numClasses, numFeatures)
>
> I could construct it by hand, but I was hoping for a more automated way of
> getting the map. Since the trained model already knows about the value,
> perhaps it's possible to grab it for storage?
>
> Thanks,
> Ashic.
>
> --
> From: as...@live.com
> To: ja...@gluru.co
> CC: user@spark.apache.org
> Subject: RE: ML Random Forest Classifier
> Date: Mon, 11 Apr 2016 23:21:53 +0100
>
> Thanks, James. That looks promising.
>
> --
> Date: Mon, 11 Apr 2016 10:41:07 +0100
> Subject: Re: ML Random Forest Classifier
> From: ja...@gluru.co
> To: as...@live.com
> CC: user@spark.apache.org
>
> To add a bit more detail perhaps something like this might work:
>
> package org.apache.spark.ml
>
>
> import org.apache.spark.ml.classification.RandomForestClassificationModel
> import org.apache.spark.ml.classification.DecisionTreeClassificationModel
> import org.apache.spark.ml.classification.LogisticRegressionModel
> import org.apache.spark.mllib.tree.model.{ RandomForestModel =>
> OldRandomForestModel }
> import org.apache.spark.ml.classification.RandomForestClassifier
>
>
> object RandomForestModelConverter {
>
>
>   def fromOld(oldModel: OldRandomForestModel, parent:
> RandomForestClassifier = null,
> categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int
> = -1): RandomForestClassificationModel = {
> RandomForestClassificationModel.fromOld(oldModel, parent,
> categoricalFeatures, numClasses, numFeatures)
>   }
>
>
>   def toOld(newModel: RandomForestClassificationModel):
> OldRandomForestModel = {
> newModel.toOld
>   }
> }
>
>
> Regards,
>
> James
>
>
> On 11 April 2016 at 10:36, James Hammerton  wrote:
>
> There are methods for converting the dataframe based random forest models
> to the old RDD based models and vice versa. Perhaps using these will help
> given that the old models can be saved and loaded?
>
> In order to use them however you will need to write code in the
> org.apache.spark.ml package.
>
> I've not actually tried doing this myself but it looks as if it might work.
>
> Regards,
>
> James
>
> On 11 April 2016 at 10:29, Ashic Mahtab  wrote:
>
> Hello,
> I'm trying to save a pipeline with a random forest classifier. If I try to
> save the pipeline, it complains that the classifier is not Writable, and
> indeed the classifier itself doesn't have a write function. There's a pull
> request that's been merged that enables this for Spark 2.0 (any dates
> around when that'll release?). I am, however, using the Spark Cassandra
> Connector which doesn't seem to be able to create a CqlContext with spark
> 2.0 snapshot builds. Seeing that ML Lib's random forest classifier supports
> storing and loading models, is there a way to create a Spark ML pipeline in
> Spark 1.6 with a random forest classifier that'll allow me to store and
> load the model? The model takes significant amount of time to train, and I
> really don't want to have to train it every time my application launches.
>
> Thanks,
> Ashic.
>
>
>
>


Re: ML Random Forest Classifier

2016-04-11 Thread James Hammerton
To add a bit more detail perhaps something like this might work:

package org.apache.spark.ml
>
>
> import org.apache.spark.ml.classification.RandomForestClassificationModel
>
> import org.apache.spark.ml.classification.DecisionTreeClassificationModel
>
> import org.apache.spark.ml.classification.LogisticRegressionModel
>
> import org.apache.spark.mllib.tree.model.{ RandomForestModel =>
> OldRandomForestModel }
>
> import org.apache.spark.ml.classification.RandomForestClassifier
>
>
> object RandomForestModelConverter {
>
>
>   def fromOld(oldModel: OldRandomForestModel, parent:
> RandomForestClassifier = null,
>
> categoricalFeatures: Map[Int, Int], numClasses: Int, numFeatures: Int
> = -1): RandomForestClassificationModel = {
>
> RandomForestClassificationModel.fromOld(oldModel, parent,
> categoricalFeatures, numClasses, numFeatures)
>
>   }
>
>
>   def toOld(newModel: RandomForestClassificationModel):
> OldRandomForestModel = {
>
> newModel.toOld
>
>   }
>
> }
>

Regards,

James


On 11 April 2016 at 10:36, James Hammerton  wrote:

> There are methods for converting the dataframe based random forest models
> to the old RDD based models and vice versa. Perhaps using these will help
> given that the old models can be saved and loaded?
>
> In order to use them however you will need to write code in the
> org.apache.spark.ml package.
>
> I've not actually tried doing this myself but it looks as if it might work.
>
> Regards,
>
> James
>
> On 11 April 2016 at 10:29, Ashic Mahtab  wrote:
>
>> Hello,
>> I'm trying to save a pipeline with a random forest classifier. If I try
>> to save the pipeline, it complains that the classifier is not Writable, and
>> indeed the classifier itself doesn't have a write function. There's a pull
>> request that's been merged that enables this for Spark 2.0 (any dates
>> around when that'll release?). I am, however, using the Spark Cassandra
>> Connector which doesn't seem to be able to create a CqlContext with spark
>> 2.0 snapshot builds. Seeing that ML Lib's random forest classifier supports
>> storing and loading models, is there a way to create a Spark ML pipeline in
>> Spark 1.6 with a random forest classifier that'll allow me to store and
>> load the model? The model takes significant amount of time to train, and I
>> really don't want to have to train it every time my application launches.
>>
>> Thanks,
>> Ashic.
>>
>
>


Re: ML Random Forest Classifier

2016-04-11 Thread James Hammerton
There are methods for converting the dataframe based random forest models
to the old RDD based models and vice versa. Perhaps using these will help
given that the old models can be saved and loaded?

In order to use them however you will need to write code in the
org.apache.spark.ml package.

I've not actually tried doing this myself but it looks as if it might work.

Regards,

James

On 11 April 2016 at 10:29, Ashic Mahtab  wrote:

> Hello,
> I'm trying to save a pipeline with a random forest classifier. If I try to
> save the pipeline, it complains that the classifier is not Writable, and
> indeed the classifier itself doesn't have a write function. There's a pull
> request that's been merged that enables this for Spark 2.0 (any dates
> around when that'll release?). I am, however, using the Spark Cassandra
> Connector which doesn't seem to be able to create a CqlContext with spark
> 2.0 snapshot builds. Seeing that ML Lib's random forest classifier supports
> storing and loading models, is there a way to create a Spark ML pipeline in
> Spark 1.6 with a random forest classifier that'll allow me to store and
> load the model? The model takes significant amount of time to train, and I
> really don't want to have to train it every time my application launches.
>
> Thanks,
> Ashic.
>


Logistic regression throwing errors

2016-04-01 Thread James Hammerton
Hi,

On a particular .csv data set - which I can use in WEKA's logistic
regression implementation without any trouble, I'm getting errors like the
following:

16/04/01 18:04:18 ERROR LBFGS: Failure! Resetting history:
> breeze.optimize.FirstOrderException: Line search failed

These errors cause the learning to fail - f1 = 0.

Anyone got any idea why this might happen?

Regards,

James


Re: Work out date column in CSV more than 6 months old (datediff or something)

2016-03-22 Thread James Hammerton
On 22 March 2016 at 10:57, Mich Talebzadeh 
wrote:

> Thanks Silvio.
>
> The problem I have is that somehow string comparison does not work.
>
> Case in point
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> val current_date = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> 'dd/MM/') ").collect.apply(0).getString(0)
> df.filter(*lit(current_date) < col("Payment 
> date"*)).select(lit(current_date).alias("current_date"),
> col("Payment date").alias("PaymentDate")).show(5)
>
>
This is doing a string comparison not a date comparison (assuming "Payment
date" is of type String).

E.g.

scala> "22/03/2016" < "24/02/2015"
>
> res4: Boolean = true
>
>
>> scala> "22/03/2016" < "04/02/2015"
>
> res5: Boolean = false
>
>
This is the correct result for a string comparison but it's not the
comparison you want.

I think you need to convert the "Payment date" with "to_date" and compare
against that.

E.g. something like: df.filter(current_date() < to_date(col("Payment
date")))

Regards,

James



> It selects all the rows that are less than today's date (they are old).
>
> ++---+
> |current_date|PaymentDate|
> ++---+
> |  22/03/2016| 24/02/2014|
> |  22/03/2016| 24/03/2014|
> |  22/03/2016| 31/03/2015|
> |  22/03/2016| 28/04/2014|
> |  22/03/2016| 26/05/2014|
> ++---+
>
> I don't know why this comparison is failing. May be it is comparing the
> first two leftmost characters?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 22 March 2016 at 00:26, Silvio Fiorito 
> wrote:
>
>> There’s a months_between function you could use, as well:
>>
>> df.filter(months_between(current_date, $”Payment Date”) > 6).show
>>
>> From: Mich Talebzadeh 
>> Date: Monday, March 21, 2016 at 5:53 PM
>> To: "user @spark" 
>> Subject: Work out date column in CSV more than 6 months old (datediff or
>> something)
>>
>> Hi,
>>
>> For test purposes I am reading in a simple csv file as follows:
>>
>> val df =
>> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
>> "true").option("header", "true").load("/data/stg/table2")
>> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment
>> date: string, Net: string, VAT: string, Total: string]
>>
>> For this work I am interested in column "Payment Date" > 6 months old
>> from today
>>
>> Data is stored in the following format for that column
>>
>> scala> df.select("Payment date").take(2)
>> res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])
>>
>> stored as 'dd/MM/'
>>
>> The current time I get as
>>
>> scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
>> 'dd/MM/') ").collect.apply(0).getString(0)
>> today: String = 21/03/2016
>>
>>
>> So I want to filter the csv file
>>
>> scala>  df.filter(col("Payment date") < lit(today)).show(2)
>> +--++-+-+-+
>> |Invoice Number|Payment date|  Net|  VAT|Total|
>> +--++-+-+-+
>> |   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
>> |   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
>> +--++-+-+-+
>>
>>
>> However, I want to use datediff() function here not just < today!
>>
>>
>> Obviously one can store the file as a table and use SQL on it. However, I
>> want to see if there are other ways using fp.
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


Re: Find all invoices more than 6 months from csv file

2016-03-22 Thread James Hammerton
On 21 March 2016 at 17:57, Mich Talebzadeh 
wrote:

>
> Hi,
>
> For test purposes I am ready a simple csv file as follows:
>
> val df =
> sqlContext.read.format("com.databricks.spark.csv").option("inferSchema",
> "true").option("header", "true").load("/data/stg/table2")
> df: org.apache.spark.sql.DataFrame = [Invoice Number: string, Payment
> date: string, Net: string, VAT: string, Total: string]
>
> For this work I am interested in column "Payment Date" > 6 months old from
> today
>
> Data is stored in the following format for that column
>
> scala> df.select("Payment date").take(2)
> res40: Array[org.apache.spark.sql.Row] = Array([10/02/2014], [17/02/2014])
>
> stored as 'dd/MM/'
>
> The current time I get as
>
> scala> val today = sqlContext.sql("SELECT FROM_unixtime(unix_timestamp(),
> 'dd/MM/') ").collect.apply(0).getString(0)
> today: String = 21/03/2016
>
>
> So I want to filter the csv file
>
> scala>  df.filter(col("Payment date") < lit(today)).show(2)
> +--++-+-+-+
> |Invoice Number|Payment date|  Net|  VAT|Total|
> +--++-+-+-+
> |   360|  10/02/2014|?2,500.00|?0.00|?2,500.00|
> |   361|  17/02/2014|?2,500.00|?0.00|?2,500.00|
> +--++-+-+-+
>
>
> However, I want to use datediff() function here not just < today!
>
>
Could you not compute which the date of the 6 month cut-off point and use
that in place of today?

Looking at the api I see an add_month(), date_add() and date_sub() methods,
the first adds a number of months to a start date (would adding a -ve
number of months to the current date work?), the latter two add or subtract
a specified number of days to/from a date, these are available in 1.5.0
onwards.

Alternatively outside of the SQL api (e.g. in a UDF) you could use
something like:

val c = Calendar.getInstance()
> c.setTime(new Date(System.currentTimeMillis()))
> c.add(Calendar.MONTH, -6)
> val date: Date = c.getTime


Regards,

James




>
> Obviously one can store the file as a table and use SQL on it. However, I
> want to see if there are other ways using fp.
>
> Thanks
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Add org.apache.spark.mllib model .predict() method to models in org.apache.spark.ml?

2016-03-22 Thread James Hammerton
Hi,

The machine learning models in org.apache.spark.mllib have a .predict()
method that can be applied to a Vector to return a prediction.

However this method does not appear on the new models on org.apache.spark.ml
and you have to wrap up a Vector in a DataFrame to send a prediction in.
This ties you into bringing in more of Spark's code as a dependency if you
wish to embed the models in production code outside of Spark.

Also if you wish to feed predictions in one at a time in that context it
makes the process a lot slower, thus it seems to me the old models are more
amenable to being used outside of Spark than the new models at this time.

Are there any plans to add the .predict() method back to the models in the
new API?

Regards,

James


Re: best way to do deep learning on spark ?

2016-03-20 Thread James Hammerton
In the meantime there is also deeplearning4j which integrates with Spark
(for both Java and Scala): http://deeplearning4j.org/

Regards,

James

On 17 March 2016 at 02:32, Ulanov, Alexander 
wrote:

> Hi Charles,
>
>
>
> There is an implementation of multilayer perceptron in Spark (since 1.5):
>
>
> https://spark.apache.org/docs/latest/ml-classification-regression.html#multilayer-perceptron-classifier
>
>
>
> Other features such as autoencoder, convolutional layers, etc. are
> currently under development. Please refer to
> https://issues.apache.org/jira/browse/SPARK-5575
>
>
>
> Best regards, Alexander
>
>
>
> *From:* charles li [mailto:charles.up...@gmail.com]
> *Sent:* Wednesday, March 16, 2016 7:01 PM
> *To:* user 
> *Subject:* best way to do deep learning on spark ?
>
>
>
>
>
> Hi, guys, I'm new to MLlib on spark, after reading the document, it seems
> that MLlib does not support deep learning, I want to know is there any way
> to implement deep learning on spark ?
>
>
>
> *Do I must use 3-party package like caffe or tensorflow ?*
>
>
>
> or
>
>
>
> *Does deep learning module list in the MLlib development plan?*
>
>
>
>
> great thanks
>
>
>
> --
>
> *--*
>
> a spark lover, a quant, a developer and a good man.
>
>
>
> http://github.com/litaotao
>


Saving the DataFrame based RandomForestClassificationModels

2016-03-18 Thread James Hammerton
Hi,

If you train a
org.apache.spark.ml.classification.RandomForestClassificationModel, you
can't save it - attempts to do so yield the following error:

16/03/18 14:12:44 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Pipeline write will fail on this Pipeline because it contains a stage
> which does not implement Writable. Non-Writable stage: rfc_704981ba3f48
> of type class org.apache.spark.ml.classification.RandomForestClassifier
> at org.apache.spark.ml.
> Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply(Pipeline.scala:
> 218)
> at org.apache.spark.ml.
> Pipeline$SharedReadWrite$$anonfun$validateStages$1.apply(Pipeline.scala:
> 215)


This appears to be a known bug:
https://issues.apache.org/jira/browse/SPARK-13784 related to
https://issues.apache.org/jira/browse/SPARK-11888

My question is whether there's a work around given that these bugs are
unresolved at least until 2.0.0.

Regards,

James


Best way to process values for key in sorted order

2016-03-15 Thread James Hammerton
Hi,

I need to process some events in a specific order based on a timestamp, for
each user in my data.

I had implemented this by using the dataframe sort method to sort by user
id and then sort by the timestamp secondarily, then do a
groupBy().mapValues() to process the events for each user.

However on re-reading the docs I see that groupByKey() does not guarantee
any ordering of the values, yet my code (which will fall over on out of
order events) seems to run OK so far, on a local mode but with a machine
with 8 CPUs.

I guess the easiest way to be certain would be to sort the values after the
groupByKey, but I'm wondering if using mapPartitions() to process all
entries in a partition would work, given I had pre-ordered the data?

This would require a bit more work to track when I switch from one user to
the next as I process the events, but if the original order has been
preserved on reading the events in, this should work.

Anyone know definitively if this is the case?

Regards,

James


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-09 Thread James Hammerton
Hi Ted,

Finally got round to creating this:
https://issues.apache.org/jira/browse/SPARK-13773

I hope you don't mind me selecting you as the shepherd for this ticket.

Regards,

James


On 7 March 2016 at 17:50, James Hammerton  wrote:

> Hi Ted,
>
> Thanks for getting back - I realised my mistake... I was clicking the
> little drop down menu on the right hand side of the Create button (it looks
> as if it's part of the button) - when I clicked directly on the word
> "Create" I got a form that made more sense and allowed me to choose the
> project.
>
> Regards,
>
> James
>
>
> On 7 March 2016 at 13:09, Ted Yu  wrote:
>
>> Have you tried clicking on Create button from an existing Spark JIRA ?
>> e.g.
>> https://issues.apache.org/jira/browse/SPARK-4352
>>
>> Once you're logged in, you should be able to select Spark as the Project.
>>
>> Cheers
>>
>> On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton  wrote:
>>
>>> Hi,
>>>
>>> So I managed to isolate the bug and I'm ready to try raising a JIRA
>>> issue. I joined the Apache Jira project so I can create tickets.
>>>
>>> However when I click Create from the Spark project home page on JIRA, it
>>> asks me to click on one of the following service desks: Kylin, Atlas,
>>> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
>>> raise an issue for Spark?!
>>>
>>> Regards,
>>>
>>> James
>>>
>>>
>>> On 4 March 2016 at 14:03, James Hammerton  wrote:
>>>
>>>> Sure thing, I'll see if I can isolate this.
>>>>
>>>> Regards.
>>>>
>>>> James
>>>>
>>>> On 4 March 2016 at 12:24, Ted Yu  wrote:
>>>>
>>>>> If you can reproduce the following with a unit test, I suggest you
>>>>> open a JIRA.
>>>>>
>>>>> Thanks
>>>>>
>>>>> On Mar 4, 2016, at 4:01 AM, James Hammerton  wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> I've come across some strange behaviour with Spark 1.6.0.
>>>>>
>>>>> In the code below, the filtering by "eventName" only seems to work if
>>>>> I called .cache on the resulting DataFrame.
>>>>>
>>>>> If I don't do this, the code crashes inside the UDF because it
>>>>> processes an event that the filter should get rid off.
>>>>>
>>>>> Any ideas why this might be the case?
>>>>>
>>>>> The code is as follows:
>>>>>
>>>>>>   val df = sqlContext.read.parquet(inputPath)
>>>>>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>>>>>   val extracted = extractEmailReferences(sqlContext,
>>>>>> filtered.cache) // Caching seems to be required for the filter to work
>>>>>>   extracted.write.parquet(outputPath)
>>>>>
>>>>>
>>>>> where extractEmailReferences does this:
>>>>>
>>>>>>
>>>>>
>>>>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>>>>> DataFrame = {
>>>>>
>>>>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>>>>
>>>>>   extractReferencesUDF(df(EventFieldNames.EventJson),
>>>>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as 
>>>>>> "references")
>>>>>
>>>>>
>>>>>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>>>>
>>>>>   }
>>>>>
>>>>>
>>>>> and extractReferencesUDF:
>>>>>
>>>>>> def extractReferencesUDF = udf(extractReferences(_: String, _:
>>>>>> String, _: String))
>>>>>
>>>>> def extractReferences(eventJson: String, objectId: String, userId:
>>>>>> String): String = {
>>>>>> import org.json4s.jackson.Serialization
>>>>>> import org.json4s.NoTypeHints
>>>>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>>>>
>>>>>> val created = Serialization.read[GMailMessage.Created](eventJson)
>>>>>> // This is where the code crashes if the .cache isn't called
>>>>>
>>>>>
>>>>>  Regards,
>>>>>
>>>>> James
>>>>>
>>>>>
>>>>
>>>
>>
>


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-07 Thread James Hammerton
Hi Ted,

Thanks for getting back - I realised my mistake... I was clicking the
little drop down menu on the right hand side of the Create button (it looks
as if it's part of the button) - when I clicked directly on the word
"Create" I got a form that made more sense and allowed me to choose the
project.

Regards,

James


On 7 March 2016 at 13:09, Ted Yu  wrote:

> Have you tried clicking on Create button from an existing Spark JIRA ?
> e.g.
> https://issues.apache.org/jira/browse/SPARK-4352
>
> Once you're logged in, you should be able to select Spark as the Project.
>
> Cheers
>
> On Mon, Mar 7, 2016 at 2:54 AM, James Hammerton  wrote:
>
>> Hi,
>>
>> So I managed to isolate the bug and I'm ready to try raising a JIRA
>> issue. I joined the Apache Jira project so I can create tickets.
>>
>> However when I click Create from the Spark project home page on JIRA, it
>> asks me to click on one of the following service desks: Kylin, Atlas,
>> Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
>> raise an issue for Spark?!
>>
>> Regards,
>>
>> James
>>
>>
>> On 4 March 2016 at 14:03, James Hammerton  wrote:
>>
>>> Sure thing, I'll see if I can isolate this.
>>>
>>> Regards.
>>>
>>> James
>>>
>>> On 4 March 2016 at 12:24, Ted Yu  wrote:
>>>
>>>> If you can reproduce the following with a unit test, I suggest you open
>>>> a JIRA.
>>>>
>>>> Thanks
>>>>
>>>> On Mar 4, 2016, at 4:01 AM, James Hammerton  wrote:
>>>>
>>>> Hi,
>>>>
>>>> I've come across some strange behaviour with Spark 1.6.0.
>>>>
>>>> In the code below, the filtering by "eventName" only seems to work if I
>>>> called .cache on the resulting DataFrame.
>>>>
>>>> If I don't do this, the code crashes inside the UDF because it
>>>> processes an event that the filter should get rid off.
>>>>
>>>> Any ideas why this might be the case?
>>>>
>>>> The code is as follows:
>>>>
>>>>>   val df = sqlContext.read.parquet(inputPath)
>>>>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>>>>   val extracted = extractEmailReferences(sqlContext,
>>>>> filtered.cache) // Caching seems to be required for the filter to work
>>>>>   extracted.write.parquet(outputPath)
>>>>
>>>>
>>>> where extractEmailReferences does this:
>>>>
>>>>>
>>>>
>>>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>>>> DataFrame = {
>>>>
>>>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>>>
>>>>   extractReferencesUDF(df(EventFieldNames.EventJson),
>>>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>>>>
>>>>
>>>>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>>>
>>>>   }
>>>>
>>>>
>>>> and extractReferencesUDF:
>>>>
>>>>> def extractReferencesUDF = udf(extractReferences(_: String, _: String,
>>>>> _: String))
>>>>
>>>> def extractReferences(eventJson: String, objectId: String, userId:
>>>>> String): String = {
>>>>> import org.json4s.jackson.Serialization
>>>>> import org.json4s.NoTypeHints
>>>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>>>
>>>>> val created = Serialization.read[GMailMessage.Created](eventJson)
>>>>> // This is where the code crashes if the .cache isn't called
>>>>
>>>>
>>>>  Regards,
>>>>
>>>> James
>>>>
>>>>
>>>
>>
>


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-07 Thread James Hammerton
Hi,

So I managed to isolate the bug and I'm ready to try raising a JIRA issue.
I joined the Apache Jira project so I can create tickets.

However when I click Create from the Spark project home page on JIRA, it
asks me to click on one of the following service desks: Kylin, Atlas,
Ranger, Apache Infrastructure. There doesn't seem to be an option for me to
raise an issue for Spark?!

Regards,

James


On 4 March 2016 at 14:03, James Hammerton  wrote:

> Sure thing, I'll see if I can isolate this.
>
> Regards.
>
> James
>
> On 4 March 2016 at 12:24, Ted Yu  wrote:
>
>> If you can reproduce the following with a unit test, I suggest you open a
>> JIRA.
>>
>> Thanks
>>
>> On Mar 4, 2016, at 4:01 AM, James Hammerton  wrote:
>>
>> Hi,
>>
>> I've come across some strange behaviour with Spark 1.6.0.
>>
>> In the code below, the filtering by "eventName" only seems to work if I
>> called .cache on the resulting DataFrame.
>>
>> If I don't do this, the code crashes inside the UDF because it processes
>> an event that the filter should get rid off.
>>
>> Any ideas why this might be the case?
>>
>> The code is as follows:
>>
>>>   val df = sqlContext.read.parquet(inputPath)
>>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>>   val extracted = extractEmailReferences(sqlContext, filtered.cache)
>>> // Caching seems to be required for the filter to work
>>>   extracted.write.parquet(outputPath)
>>
>>
>> where extractEmailReferences does this:
>>
>>>
>>
>> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>>> DataFrame = {
>>
>> val extracted = df.select(df(EventFieldNames.ObjectId),
>>
>>   extractReferencesUDF(df(EventFieldNames.EventJson),
>>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>>
>>
>>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>>
>>   }
>>
>>
>> and extractReferencesUDF:
>>
>>> def extractReferencesUDF = udf(extractReferences(_: String, _: String,
>>> _: String))
>>
>> def extractReferences(eventJson: String, objectId: String, userId:
>>> String): String = {
>>> import org.json4s.jackson.Serialization
>>> import org.json4s.NoTypeHints
>>> implicit val formats = Serialization.formats(NoTypeHints)
>>>
>>> val created = Serialization.read[GMailMessage.Created](eventJson) //
>>> This is where the code crashes if the .cache isn't called
>>
>>
>>  Regards,
>>
>> James
>>
>>
>


Re: DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-04 Thread James Hammerton
Sure thing, I'll see if I can isolate this.

Regards.

James

On 4 March 2016 at 12:24, Ted Yu  wrote:

> If you can reproduce the following with a unit test, I suggest you open a
> JIRA.
>
> Thanks
>
> On Mar 4, 2016, at 4:01 AM, James Hammerton  wrote:
>
> Hi,
>
> I've come across some strange behaviour with Spark 1.6.0.
>
> In the code below, the filtering by "eventName" only seems to work if I
> called .cache on the resulting DataFrame.
>
> If I don't do this, the code crashes inside the UDF because it processes
> an event that the filter should get rid off.
>
> Any ideas why this might be the case?
>
> The code is as follows:
>
>>   val df = sqlContext.read.parquet(inputPath)
>>   val filtered = df.filter(df("eventName").equalTo(Created))
>>   val extracted = extractEmailReferences(sqlContext, filtered.cache)
>> // Caching seems to be required for the filter to work
>>   extracted.write.parquet(outputPath)
>
>
> where extractEmailReferences does this:
>
>>
>
> def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
>> DataFrame = {
>
> val extracted = df.select(df(EventFieldNames.ObjectId),
>
>   extractReferencesUDF(df(EventFieldNames.EventJson),
>> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")
>
>
>> extracted.filter(extracted("references").notEqual("UNKNOWN"))
>
>   }
>
>
> and extractReferencesUDF:
>
>> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _:
>> String))
>
> def extractReferences(eventJson: String, objectId: String, userId:
>> String): String = {
>> import org.json4s.jackson.Serialization
>> import org.json4s.NoTypeHints
>> implicit val formats = Serialization.formats(NoTypeHints)
>>
>> val created = Serialization.read[GMailMessage.Created](eventJson) //
>> This is where the code crashes if the .cache isn't called
>
>
>  Regards,
>
> James
>
>


DataFrame .filter only seems to work when .cache is called in local mode in 1.6.0

2016-03-04 Thread James Hammerton
Hi,

I've come across some strange behaviour with Spark 1.6.0.

In the code below, the filtering by "eventName" only seems to work if I
called .cache on the resulting DataFrame.

If I don't do this, the code crashes inside the UDF because it processes an
event that the filter should get rid off.

Any ideas why this might be the case?

The code is as follows:

>   val df = sqlContext.read.parquet(inputPath)
>   val filtered = df.filter(df("eventName").equalTo(Created))
>   val extracted = extractEmailReferences(sqlContext, filtered.cache)
> // Caching seems to be required for the filter to work
>   extracted.write.parquet(outputPath)


where extractEmailReferences does this:

>

def extractEmailReferences(sqlContext: SQLContext, df: DataFrame):
> DataFrame = {

val extracted = df.select(df(EventFieldNames.ObjectId),

  extractReferencesUDF(df(EventFieldNames.EventJson),
> df(EventFieldNames.ObjectId), df(EventFieldNames.UserId)) as "references")


> extracted.filter(extracted("references").notEqual("UNKNOWN"))

  }


and extractReferencesUDF:

> def extractReferencesUDF = udf(extractReferences(_: String, _: String, _:
> String))

def extractReferences(eventJson: String, objectId: String, userId: String):
> String = {
> import org.json4s.jackson.Serialization
> import org.json4s.NoTypeHints
> implicit val formats = Serialization.formats(NoTypeHints)
>
> val created = Serialization.read[GMailMessage.Created](eventJson) //
> This is where the code crashes if the .cache isn't called


 Regards,

James


Re: How to control the number of parquet files getting created under a partition ?

2016-03-02 Thread James Hammerton
Hi,

Based on the behaviour I've seen using parquet, the number of partitions in
the DataFrame will determine the number of files in each parquet partition.

I.e. when you use "PARTITION BY" you're actually partitioning twice, once
via the partitions spark has created internally and then again with the
partitions you specify in the "PARTITION BY" clause.

So if you have 10 partitions in your DataFrame, and save that as a parquet
file or table partitioned on a column with 3 values, you'll get 30
partitions, 10 per parquet partition.

You can reduce the number of partitions in the DataFrame by using
coalesce() before saving the data.

Regards,

James


On 1 March 2016 at 21:01, SRK  wrote:

> Hi,
>
> How can I control the number of parquet files getting created under a
> partition? I have my sqlContext queries to create a table and insert the
> records as follows. It seems to create around 250 parquet files under each
> partition though I was expecting that to create around 2 or 3 files. Due to
> the large number of files, it takes a lot of time to scan the records. Any
> suggestions as to how to control the number of parquet files under each
> partition would be of great help.
>
>  sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS testUserDts
> (userId STRING, savedDate STRING) PARTITIONED BY (partitioner STRING)
> stored as PARQUET LOCATION '/user/testId/testUserDts' ")
>
>   sqlContext.sql(
> """from testUserDtsTemp ps   insert overwrite table testUserDts
> partition(partitioner)  select ps.userId, ps.savedDate ,  ps.partitioner
> """.stripMargin)
>
>
>
> Thanks!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-the-number-of-parquet-files-getting-created-under-a-partition-tp26374.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Count job stalling at shuffle stage on 3.4TB input (but only 5.3GB shuffle write)

2016-02-23 Thread James Hammerton
Hi,

I have been having problems processing a 3.4TB data set - uncompressed tab
separated text - containing object creation/update events from our system,
one event per line.

I decided to see what happens with a count of the number of events (=
number of lines in the text files) and a count of the number of distinct
object ids, which I thought should be straightforward enough to succeed.

The job stalled at the end of the first stage (55657 tasks, albeit 1 failed
but I've seen processing continue to the next stage despite small numbers
of failures) despite only generating a 5.3GB shuffle. It ran for 2.5 hours
and is now sitting apparently doing nothing.

Does this suggest something is wrong with the cluster? Computing either
event count should be straightforward despite the size of the data set, or
am I missing something?

The set up is a spark-ec2 generated cluster (trying EMR will be my next
move, along with bucketing the data via parquet)  running Spark 1.5.2,
openjdk 8 (this is a scala job though, but others are java), r3.2xlarge
instance types, 5 slaves each with 500GB EBS volumes which SPARK_LOCAL_DIRS
points to.

The code is:

val sc = new SparkContext(conf);
> try {
>   val rawSchema = StructType(Array(
> StructField("objectId", DataTypes.StringType, true),
> StructField("eventName", DataTypes.StringType, true),
> StructField("eventJson", DataTypes.StringType, true),
> StructField("timestampNanos", DataTypes.StringType, true)))
>   val sqlContext = new SQLContext(sc)
>   val df = sqlContext.read
> .format("com.databricks.spark.csv")
> .option("header", "false")
> .option("delimiter", "\t")
> .schema(rawSchema)
> .load(inputPath)
>   val oids = df.select("objectId")
>   val distinct = oids.distinct.count
>   val events = oids.count
>   println("Number of objectIds: " + distinct);
>   println("Number of events: " + events);
>   println("Elapsed time: " + (System.currentTimeMillis() -
> startMillis)/1000 + "s")


Here's the plan as revealed by the SQL part of the UI:

== Parsed Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Analyzed Logical Plan ==
> count: bigint
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0], [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Optimized Logical Plan ==
> Aggregate [count(1) AS count#4L]
>  Aggregate [objectId#0]
>   Project [objectId#0]
>Relation[objectId#0,eventName#1,eventJson#2,timestampNanos#3] 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
> 
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)
>
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#4L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#7L])
>TungstenAggregate(key=[objectId#0], functions=[], output=[])
> TungstenExchange hashpartitioning(objectId#0)
>  TungstenAggregate(key=[objectId#0], functions=[], output=[objectId#0])
>   Scan 
> CsvRelation(,Some(s3n://gluru-research/data/events.prod.2016-02-04/extractedEventsUncompressed),false,
>   
> ,",null,#,PERMISSIVE,COMMONS,false,false,false,StructType(StructField(objectId,StringType,true),
>  StructField(eventName,StringType,true), 
> StructField(eventJson,StringType,true), 
> StructField(timestampNanos,StringType,true)),false,null)[objectId#0]
>
> Code Generation: true
>
>
Regards,

James


Re: Is this likely to cause any problems?

2016-02-19 Thread James Hammerton
Hi,

Having looked at how easy it is to use EMR, I reckon you may be right,
especially if using Java 8 is no more difficult with that than with
spark-ec2 (where I had to install it on the master and slaves and edit the
spark-env.sh).

I'm now curious as to why the Spark documentation (
http://spark.apache.org/docs/latest/index.html) mentions EC2 but not EMR.

Regards,

James


On 19 February 2016 at 14:25, Daniel Siegmann 
wrote:

> With EMR supporting Spark, I don't see much reason to use the spark-ec2
> script unless it is important for you to be able to launch clusters using
> the bleeding edge version of Spark. EMR does seem to do a pretty decent job
> of keeping up to date - the latest version (4.3.0) supports the latest
> Spark version (1.6.0).
>
> So I'd flip the question around and ask: is there any reason to continue
> using the spark-ec2 script rather than EMR?
>
> On Thu, Feb 18, 2016 at 11:39 AM, James Hammerton  wrote:
>
>> I have now... So far  I think the issues I've had are not related to
>> this, but I wanted to be sure in case it should be something that needs to
>> be patched. I've had some jobs run successfully but this warning appears in
>> the logs.
>>
>> Regards,
>>
>> James
>>
>> On 18 February 2016 at 12:23, Ted Yu  wrote:
>>
>>> Have you seen this ?
>>>
>>> HADOOP-10988
>>>
>>> Cheers
>>>
>>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>>
>>>> HI,
>>>>
>>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>>
>>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>>> disabled stack guard. The VM will try to fix the stack guard now.
>>>> It's highly recommended that you fix the library with 'execstack -c 
>>>> ', or link it with '-z noexecstack'.
>>>>
>>>>
>>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>>> is m4.large.
>>>>
>>>> Could this contribute to any problems running the jobs?
>>>>
>>>> Regards,
>>>>
>>>> James
>>>>
>>>
>>>
>>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
I have now... So far  I think the issues I've had are not related to this,
but I wanted to be sure in case it should be something that needs to be
patched. I've had some jobs run successfully but this warning appears in
the logs.

Regards,

James

On 18 February 2016 at 12:23, Ted Yu  wrote:

> Have you seen this ?
>
> HADOOP-10988
>
> Cheers
>
> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>
>> HI,
>>
>> I am seeing warnings like this in the logs when I run Spark jobs:
>>
>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have disabled 
>> stack guard. The VM will try to fix the stack guard now.
>> It's highly recommended that you fix the library with 'execstack -c 
>> ', or link it with '-z noexecstack'.
>>
>>
>> I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
>> hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
>> some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
>> m4.large.
>>
>> Could this contribute to any problems running the jobs?
>>
>> Regards,
>>
>> James
>>
>
>


Re: Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
I'm fairly new to Spark.

The documentation suggests using the spark-ec2 script to launch clusters in
AWS, hence I used it.

Would EMR offer any advantage?

Regards,

James


On 18 February 2016 at 14:04, Gourav Sengupta 
wrote:

> Hi,
>
> Just out of sheet curiosity why are you not using EMR to start your SPARK
> cluster?
>
>
> Regards,
> Gourav
>
> On Thu, Feb 18, 2016 at 12:23 PM, Ted Yu  wrote:
>
>> Have you seen this ?
>>
>> HADOOP-10988
>>
>> Cheers
>>
>> On Thu, Feb 18, 2016 at 3:39 AM, James Hammerton  wrote:
>>
>>> HI,
>>>
>>> I am seeing warnings like this in the logs when I run Spark jobs:
>>>
>>> OpenJDK 64-Bit Server VM warning: You have loaded library 
>>> /root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have 
>>> disabled stack guard. The VM will try to fix the stack guard now.
>>> It's highly recommended that you fix the library with 'execstack -c 
>>> ', or link it with '-z noexecstack'.
>>>
>>>
>>> I used spark-ec2 to launch the cluster with the default AMI, Spark
>>> 1.5.2, hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd
>>> written some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master
>>> is m4.large.
>>>
>>> Could this contribute to any problems running the jobs?
>>>
>>> Regards,
>>>
>>> James
>>>
>>
>>
>


Is this likely to cause any problems?

2016-02-18 Thread James Hammerton
HI,

I am seeing warnings like this in the logs when I run Spark jobs:

OpenJDK 64-Bit Server VM warning: You have loaded library
/root/ephemeral-hdfs/lib/native/libhadoop.so.1.0.0 which might have
disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c
', or link it with '-z noexecstack'.


I used spark-ec2 to launch the cluster with the default AMI, Spark 1.5.2,
hadoop major version 2.4. I altered the jdk to be openjdk 8 as I'd written
some jobs in Java 8. The 6 workers nodes are m4.2xlarge and master is
m4.large.

Could this contribute to any problems running the jobs?

Regards,

James