CI/CD for spark and scala

2018-01-24 Thread Deepak Sharma
Hi All,
I just wanted to check if there are any best practises around using CI/CD
for spark /  scala projects running on AWS hadoop clusters.
IF there is any specific tools , please do let me know.

-- 
Thanks
Deepak


Re: XML Parsing with Spark and SCala

2017-08-11 Thread Jörn Franke
Can you specify what "is not able to load" means and what are the expected 
results?



> On 11. Aug 2017, at 09:30, Etisha Jain  wrote:
> 
> Hi
> 
> I want to do xml parsing with spark, but the data from the file is not able
> to load and the desired output is also not coming.
> I am attaching a file. Can anyone help me to do this
> 
> solvePuzzle1.scala
> <http://apache-spark-user-list.1001560.n3.nabble.com/file/n29053/solvePuzzle1.scala>
>   
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/XML-Parsing-with-Spark-and-SCala-tp29053.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



XML Parsing with Spark and SCala

2017-08-11 Thread Etisha Jain
Hi

I want to do xml parsing with spark, but the data from the file is not able
to load and the desired output is also not coming.
I am attaching a file. Can anyone help me to do this

solvePuzzle1.scala
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n29053/solvePuzzle1.scala>
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/XML-Parsing-with-Spark-and-SCala-tp29053.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Real Time Recommendation Engines with Spark and Scala

2016-09-05 Thread Mich Talebzadeh
out.me/alonso.isidoro.roman
>>>>
>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>>
>>>> 2016-09-05 16:39 GMT+02:00 Mich Talebzadeh :
>>>>
>>>>> Thank you Alonso,
>>>>>
>>>>> I looked at your project. Interesting
>>>>>
>>>>> As I see it this is what you are suggesting
>>>>>
>>>>>
>>>>>1. A kafka producer is going to ask periodically to Amazon in
>>>>>order to know what products based on my own ratings and i am going to
>>>>>introduced them into some kafka topic.
>>>>>2. A spark streaming process is going to read from that previous
>>>>>topic.
>>>>>3. Apply some machine learning algorithms (ALS, content based
>>>>>filtering colaborative filtering) on those datasets readed by the spark
>>>>>streaming process.
>>>>>4. Save results in a mongo or cassandra instance.
>>>>>5. Use play framework to create an websocket interface between the
>>>>>mongo instance and the visual interface.
>>>>>
>>>>>
>>>>> As I understand
>>>>>
>>>>> Point 1: A kafka producer is going to ask periodically to Amazon in
>>>>> order to know what products based on my own ratings .
>>>>>
>>>>>
>>>>>1. What do you mean "my own rating" here? You know the products.
>>>>>So what Amazon is going to provide by way of Kafka?
>>>>>2. Assuming that you have created topic specifically for this
>>>>>purpose then that topic is streamed into Kafka, some algorithms is 
>>>>> applied
>>>>>and results are saved in DB
>>>>>3. You have some dashboard that will fetch data (via ???) from the
>>>>>DB and I guess Zeppelin can do it here?
>>>>>
>>>>>
>>>>> Do you Have a DFD diagram for your design in case. Something like
>>>>> below (hope does not look pedantic, non intended).
>>>>>
>>>>> [image: Inline images 1]
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn * 
>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>> On 5 September 2016 at 15:08, Alonso Isidoro Roman >>>> > wrote:
>>>>>
>>>>>> Hi Mitch, i wrote few months ago a tiny project with this issue in
>>>>>> mind. The idea is to apply ALS algorithm in order to get some valid
>>>>>> recommendations from another users.
>>>>>>
>>>>>>
>>>>>> The url of the project
>>>>>> <https://github.com/alonsoir/awesome-recommendation-engine>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Alonso Isidoro Roman
>>>>>> [image: https://]about.me/alonso.isidoro.roman
>>>>>>
>>>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>>>>
>>>>>> 2016-09-05 15:41 GMT+02:00 Mich Talebzadeh >>>>> >:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Has anyone done any work on Real time recommendation engines with
>>>>>>> Spark and Scala.
>>>>>>>
>>>>>>> I have seen few PPTs with Python but wanted to see if these have
>>>>>>> been done with Scala.
>>>>>>>
>>>>>>> I trust this question makes sense.
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> p.s. My prime interest would be in Financial markets.
>>>>>>>
>>>>>>>
>>>>>>> Dr Mich Talebzadeh
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> LinkedIn * 
>>>>>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> http://talebzadehmich.wordpress.com
>>>>>>>
>>>>>>>
>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>> for any loss, damage or destruction of data or any other property which 
>>>>>>> may
>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>> disclaimed. The author will in no case be liable for any monetary 
>>>>>>> damages
>>>>>>> arising from such loss, damage or destruction.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: Real Time Recommendation Engines with Spark and Scala

2016-09-05 Thread Alonso Isidoro Roman
By the way, i would love to work in your project, looks promising!



Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2016-09-05 16:57 GMT+02:00 Alonso Isidoro Roman :

> Hi Mitch,
>
>
>1. What do you mean "my own rating" here? You know the products. So
>what Amazon is going to provide by way of Kafka?
>
> The idea was to embed the functionality of a kafka producer within a rest
> service in order i can invoke this logic with my a rating. I did not create
> such functionality because i started to make another things, i get bored,
> basically. I created some unix commands with this code, using sbt-pack.
>
>
>
>1. Assuming that you have created topic specifically for this purpose
>then that topic is streamed into Kafka, some algorithms is applied and
>results are saved in DB
>
> you got it!
>
>1. You have some dashboard that will fetch data (via ???) from the DB
>and I guess Zeppelin can do it here?
>
>
> No, not any dashboard yet. Maybe it is not a good idea to connect mongodb
> with a dashboard through a web socket. It probably works, for a proof of
> concept, but, in a real project? i don't know yet...
>
> You can see what i did to push data within a kafka topic in this scala
> class
> <https://github.com/alonsoir/awesome-recommendation-engine/blob/master/src/main/scala/example/producer/AmazonProducerExample.scala>,
> you have to invoke pack within the scala shell to create this unix command.
>
> Regards!
>
> Alonso
>
>
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> 2016-09-05 16:39 GMT+02:00 Mich Talebzadeh :
>
>> Thank you Alonso,
>>
>> I looked at your project. Interesting
>>
>> As I see it this is what you are suggesting
>>
>>
>>1. A kafka producer is going to ask periodically to Amazon in order
>>to know what products based on my own ratings and i am going to introduced
>>them into some kafka topic.
>>2. A spark streaming process is going to read from that previous
>>topic.
>>3. Apply some machine learning algorithms (ALS, content based
>>filtering colaborative filtering) on those datasets readed by the spark
>>streaming process.
>>4. Save results in a mongo or cassandra instance.
>>5. Use play framework to create an websocket interface between the
>>mongo instance and the visual interface.
>>
>>
>> As I understand
>>
>> Point 1: A kafka producer is going to ask periodically to Amazon in order
>> to know what products based on my own ratings .
>>
>>
>>1. What do you mean "my own rating" here? You know the products. So
>>what Amazon is going to provide by way of Kafka?
>>2. Assuming that you have created topic specifically for this purpose
>>then that topic is streamed into Kafka, some algorithms is applied and
>>results are saved in DB
>>3. You have some dashboard that will fetch data (via ???) from the DB
>>and I guess Zeppelin can do it here?
>>
>>
>> Do you Have a DFD diagram for your design in case. Something like below
>> (hope does not look pedantic, non intended).
>>
>> [image: Inline images 1]
>>
>>
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 5 September 2016 at 15:08, Alonso Isidoro Roman 
>> wrote:
>>
>>> Hi Mitch, i wrote few months ago a tiny project with this issue in mind.
>>> The idea is to apply ALS algorithm in order to get some valid
>>> recommendations from another users.
>>>
>>>
>>> The url of the project
>>> &

Re: Real Time Recommendation Engines with Spark and Scala

2016-09-05 Thread Alonso Isidoro Roman
Hi Mitch,


   1. What do you mean "my own rating" here? You know the products. So what
   Amazon is going to provide by way of Kafka?

The idea was to embed the functionality of a kafka producer within a rest
service in order i can invoke this logic with my a rating. I did not create
such functionality because i started to make another things, i get bored,
basically. I created some unix commands with this code, using sbt-pack.



   1. Assuming that you have created topic specifically for this purpose
   then that topic is streamed into Kafka, some algorithms is applied and
   results are saved in DB

you got it!

   1. You have some dashboard that will fetch data (via ???) from the DB
   and I guess Zeppelin can do it here?


No, not any dashboard yet. Maybe it is not a good idea to connect mongodb
with a dashboard through a web socket. It probably works, for a proof of
concept, but, in a real project? i don't know yet...

You can see what i did to push data within a kafka topic in this scala class
<https://github.com/alonsoir/awesome-recommendation-engine/blob/master/src/main/scala/example/producer/AmazonProducerExample.scala>,
you have to invoke pack within the scala shell to create this unix command.

Regards!

Alonso



Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2016-09-05 16:39 GMT+02:00 Mich Talebzadeh :

> Thank you Alonso,
>
> I looked at your project. Interesting
>
> As I see it this is what you are suggesting
>
>
>1. A kafka producer is going to ask periodically to Amazon in order to
>know what products based on my own ratings and i am going to introduced
>them into some kafka topic.
>2. A spark streaming process is going to read from that previous topic.
>3. Apply some machine learning algorithms (ALS, content based
>filtering colaborative filtering) on those datasets readed by the spark
>streaming process.
>4. Save results in a mongo or cassandra instance.
>5. Use play framework to create an websocket interface between the
>mongo instance and the visual interface.
>
>
> As I understand
>
> Point 1: A kafka producer is going to ask periodically to Amazon in order
> to know what products based on my own ratings .
>
>
>1. What do you mean "my own rating" here? You know the products. So
>what Amazon is going to provide by way of Kafka?
>2. Assuming that you have created topic specifically for this purpose
>then that topic is streamed into Kafka, some algorithms is applied and
>results are saved in DB
>3. You have some dashboard that will fetch data (via ???) from the DB
>and I guess Zeppelin can do it here?
>
>
> Do you Have a DFD diagram for your design in case. Something like below
> (hope does not look pedantic, non intended).
>
> [image: Inline images 1]
>
>
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 5 September 2016 at 15:08, Alonso Isidoro Roman 
> wrote:
>
>> Hi Mitch, i wrote few months ago a tiny project with this issue in mind.
>> The idea is to apply ALS algorithm in order to get some valid
>> recommendations from another users.
>>
>>
>> The url of the project
>> <https://github.com/alonsoir/awesome-recommendation-engine>
>>
>>
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>
>> 2016-09-05 15:41 GMT+02:00 Mich Talebzadeh :
>>
>>> Hi,
>>>
>>> Has anyone done any work on Real time recommendation engines with Spark
>>> and Scala.
>>>
>>> I have seen few PPTs with Python but wanted to see if these have been
>>> done with Scala.
>>>
>>> I trust this question makes sense.
>>>
>>> Thanks
>>>
>>> p.s. My prime interest would be in Financial mar

Re: Real Time Recommendation Engines with Spark and Scala

2016-09-05 Thread Mich Talebzadeh
Thank you Alonso,

I looked at your project. Interesting

As I see it this is what you are suggesting


   1. A kafka producer is going to ask periodically to Amazon in order to
   know what products based on my own ratings and i am going to introduced
   them into some kafka topic.
   2. A spark streaming process is going to read from that previous topic.
   3. Apply some machine learning algorithms (ALS, content based filtering
   colaborative filtering) on those datasets readed by the spark streaming
   process.
   4. Save results in a mongo or cassandra instance.
   5. Use play framework to create an websocket interface between the mongo
   instance and the visual interface.


As I understand

Point 1: A kafka producer is going to ask periodically to Amazon in order
to know what products based on my own ratings .


   1. What do you mean "my own rating" here? You know the products. So what
   Amazon is going to provide by way of Kafka?
   2. Assuming that you have created topic specifically for this purpose
   then that topic is streamed into Kafka, some algorithms is applied and
   results are saved in DB
   3. You have some dashboard that will fetch data (via ???) from the DB
   and I guess Zeppelin can do it here?


Do you Have a DFD diagram for your design in case. Something like below
(hope does not look pedantic, non intended).

[image: Inline images 1]






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 5 September 2016 at 15:08, Alonso Isidoro Roman 
wrote:

> Hi Mitch, i wrote few months ago a tiny project with this issue in mind.
> The idea is to apply ALS algorithm in order to get some valid
> recommendations from another users.
>
>
> The url of the project
> <https://github.com/alonsoir/awesome-recommendation-engine>
>
>
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> 2016-09-05 15:41 GMT+02:00 Mich Talebzadeh :
>
>> Hi,
>>
>> Has anyone done any work on Real time recommendation engines with Spark
>> and Scala.
>>
>> I have seen few PPTs with Python but wanted to see if these have been
>> done with Scala.
>>
>> I trust this question makes sense.
>>
>> Thanks
>>
>> p.s. My prime interest would be in Financial markets.
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>
>


Re: Real Time Recommendation Engines with Spark and Scala

2016-09-05 Thread Alonso Isidoro Roman
Hi Mitch, i wrote few months ago a tiny project with this issue in mind.
The idea is to apply ALS algorithm in order to get some valid
recommendations from another users.


The url of the project
<https://github.com/alonsoir/awesome-recommendation-engine>



Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>

2016-09-05 15:41 GMT+02:00 Mich Talebzadeh :

> Hi,
>
> Has anyone done any work on Real time recommendation engines with Spark
> and Scala.
>
> I have seen few PPTs with Python but wanted to see if these have been done
> with Scala.
>
> I trust this question makes sense.
>
> Thanks
>
> p.s. My prime interest would be in Financial markets.
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>


Real Time Recommendation Engines with Spark and Scala

2016-09-05 Thread Mich Talebzadeh
Hi,

Has anyone done any work on Real time recommendation engines with Spark and
Scala.

I have seen few PPTs with Python but wanted to see if these have been done
with Scala.

I trust this question makes sense.

Thanks

p.s. My prime interest would be in Financial markets.


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: Optimized way to multiply two large matrices and save output using Spark and Scala

2016-01-13 Thread Burak Yavuz
BlockMatrix.multiply is the suggested method of multiplying two large
matrices. Is there a reason that you didn't use BlockMatrices?

You can load the matrices and convert to and from RowMatrix. If it's in
sparse format (i, j, v), then you can also use the CoordinateMatrix to
load, BlockMatrix to multiply, and CoordinateMatrix to save it back again.

Thanks,
Burak

On Wed, Jan 13, 2016 at 8:16 PM, Devi P.V  wrote:

> I want to multiply two large matrices (from csv files)using Spark and
> Scala and save output.I use the following code
>
>   val rows=file1.coalesce(1,false).map(x=>{
>   val line=x.split(delimiter).map(_.toDouble)
>   Vectors.sparse(line.length,
> line.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))
>
> })
>
> val rmat = new RowMatrix(rows)
>
> val dm=file2.coalesce(1,false).map(x=>{
>   val line=x.split(delimiter).map(_.toDouble)
>   Vectors.dense(line)
> })
>
> val ma = dm.map(_.toArray).take(dm.count.toInt)
> val localMat = Matrices.dense( dm.count.toInt,
>   dm.take(1)(0).size,
>
>   transpose(ma).flatten)
>
> // Multiply two matrices
> val s=rmat.multiply(localMat).rows
>
> s.map(x=>x.toArray.mkString(delimiter)).saveAsTextFile(OutputPath)
>
>   }
>
>   def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
> (for {
>   c <- m(0).indices
> } yield m.map(_(c)) ).toArray
>   }
>
> When I save file it takes more time and output file has very large in
> size.what is the optimized way to multiply two large files and save the
> output to a text file ?
>


Optimized way to multiply two large matrices and save output using Spark and Scala

2016-01-13 Thread Devi P.V
I want to multiply two large matrices (from csv files)using Spark and Scala
and save output.I use the following code

  val rows=file1.coalesce(1,false).map(x=>{
  val line=x.split(delimiter).map(_.toDouble)
  Vectors.sparse(line.length,
line.zipWithIndex.map(e => (e._2, e._1)).filter(_._2 != 0.0))

})

val rmat = new RowMatrix(rows)

val dm=file2.coalesce(1,false).map(x=>{
  val line=x.split(delimiter).map(_.toDouble)
  Vectors.dense(line)
})

val ma = dm.map(_.toArray).take(dm.count.toInt)
val localMat = Matrices.dense( dm.count.toInt,
  dm.take(1)(0).size,

  transpose(ma).flatten)

// Multiply two matrices
val s=rmat.multiply(localMat).rows

s.map(x=>x.toArray.mkString(delimiter)).saveAsTextFile(OutputPath)

  }

  def transpose(m: Array[Array[Double]]): Array[Array[Double]] = {
(for {
  c <- m(0).indices
} yield m.map(_(c)) ).toArray
  }

When I save file it takes more time and output file has very large in
size.what is the optimized way to multiply two large files and save the
output to a text file ?


Re: Pivot Data in Spark and Scala

2015-10-31 Thread ayan guha
(disclaimer: my reply in SO)

http://stackoverflow.com/questions/30260015/reshaping-pivoting-data-in-spark-rdd-and-or-spark-dataframes/30278605#30278605


On Sat, Oct 31, 2015 at 6:21 AM, Ali Tajeldin EDU 
wrote:

> You can take a look at the smvPivot function in the SMV library (
> https://github.com/TresAmigosSD/SMV ).  Should look for method "smvPivot"
> in SmvDFHelper (
>
> http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvDFHelper).
> You can also perform the pivot on a group-by-group basis.  See smvPivot and
> smvPivotSum in SmvGroupedDataFunc (
> http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvGroupedDataFunc
> ).
>
> Docs from smvPivotSum are copied below.  Note that you don't have to
> specify the baseOutput columns, but if you don't, it will force an
> additional action on the input data frame to build the cross products of
> all possible values in your input pivot columns.
>
> Perform a normal SmvPivot operation followed by a sum on all the output
> pivot columns.
> For example:
>
> df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", 
> "5_14_B", "6_14_A", "6_14_B")
>
> and the following input:
>
> Input
> | id  | month | product | count |
> | --- | - | --- | - |
> | 1   | 5/14  |   A |   100 |
> | 1   | 6/14  |   B |   200 |
> | 1   | 5/14  |   B |   300 |
>
> will produce the following output:
>
> | id  | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B |
> | --- |  |  |  |  |
> | 1   | 100  | 300  | NULL | 200  |
>
> pivotCols
> The sequence of column names whose values will be used as the output pivot
> column names.
> valueCols
> The columns whose value will be copied to the pivoted output columns.
> baseOutput
> The expected base output column names (without the value column prefix).
> The user is required to supply the list of expected pivot column output
> names to avoid and extra action on the input DataFrame just to extract the
> possible pivot columns. if an empty sequence is provided, then the base
> output columns will be extracted from values in the pivot columns (will
> cause an action on the entire DataFrame!)
>
> --
> Ali
> PS: shoot me an email if you run into any issues using SMV.
>
>
> On Oct 30, 2015, at 6:33 AM, Andrianasolo Fanilo <
> fanilo.andrianas...@worldline.com> wrote:
>
> Hey,
>
> The question is tricky, here is a possible answer by defining years as
> keys for a hashmap per client and merging those :
>
>
> *import *scalaz._
> *import *Scalaz._
>
>
> *val *sc = *new *SparkContext(*"local[*]"*, *"sandbox"*)
>
>
> *// Create RDD of your objects**val *rdd = sc.parallelize(*Seq*(
>   (*"A"*, 2015, 4),
>   (*"A"*, 2014, 12),
>   (*"A"*, 2013, 1),
>   (*"B"*, 2015, 24),
>   (*"B"*, 2013, 4)
> ))
>
>
> *// Search for all the years in the RDD**val *minYear =
> rdd.map(_._2).reduce(Math.*min*)
> *// look for minimum year**val *maxYear = rdd.map(_._2).reduce(Math.*max*
> )
> *// look for maximum year**val *sequenceOfYears = maxYear to minYear by -1
>
>
>
> *// create sequence of years from max to min// Define functions to build,
> for each client, a Map of year -> value for year, and how those maps will
> be merged**def *createCombiner(obj: (Int, Int)): Map[Int, String] = 
> *Map*(obj._1
> -> obj._2.toString)
> *def *mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum +
> (obj._1 -> obj._2.toString)
> *def *mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) =
>  accum1 |+| accum2 *// I’m lazy so I use Scalaz to merge two maps of year
> -> value, I assume we don’t have two lines with same client and year…*
>
>
> *// For each client, check for each year from maxYear to minYear if it
> exists in the computed map. If not input blank.**val *result = rdd
>   .map { *case *obj => (obj._1, (obj._2, obj._3)) }
>   .combineByKey(createCombiner, mergeValue, mergeCombiners)
>   .map{ *case *(name, mapOfYearsToValues) => (*Seq*(name) ++
> sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, *" "*
> ))).mkString(*","*)}* // here we assume that sequence of all years isn’t
> too big to not fit in memory. If you had to compute for each day, it may
> break and you would definitely need to use a specialized timeseries
> library…*
>
> result.foreach(*println*)
>
> sc.stop()
>
> Best regards,
> Fanilo
>
> *

Re: Pivot Data in Spark and Scala

2015-10-30 Thread Ruslan Dautkhanov
https://issues.apache.org/jira/browse/SPARK-8992

Should be in 1.6?



-- 
Ruslan Dautkhanov

On Thu, Oct 29, 2015 at 5:29 AM, Ascot Moss  wrote:

> Hi,
>
> I have data as follows:
>
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
>
>
> I need to convert the data to a new format:
> A ,4,12,1
> B,   24,,4
>
> Any idea how to make it in Spark Scala?
>
> Thanks
>
>


Re: Pivot Data in Spark and Scala

2015-10-30 Thread Ali Tajeldin EDU
You can take a look at the smvPivot function in the SMV library ( 
https://github.com/TresAmigosSD/SMV ).  Should look for method "smvPivot" in 
SmvDFHelper (
http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvDFHelper).
  You can also perform the pivot on a group-by-group basis.  See smvPivot and 
smvPivotSum in SmvGroupedDataFunc 
(http://tresamigossd.github.io/SMV/scaladocs/index.html#org.tresamigos.smv.SmvGroupedDataFunc).

Docs from smvPivotSum are copied below.  Note that you don't have to specify 
the baseOutput columns, but if you don't, it will force an additional action on 
the input data frame to build the cross products of all possible values in your 
input pivot columns. 

Perform a normal SmvPivot operation followed by a sum on all the output pivot 
columns.
For example:
df.smvGroupBy("id").smvPivotSum(Seq("month", "product"))("count")("5_14_A", 
"5_14_B", "6_14_A", "6_14_B")
and the following input:
Input
| id  | month | product | count |
| --- | - | --- | - |
| 1   | 5/14  |   A |   100 |
| 1   | 6/14  |   B |   200 |
| 1   | 5/14  |   B |   300 |
will produce the following output:
| id  | count_5_14_A | count_5_14_B | count_6_14_A | count_6_14_B |
| --- |  |  |  |  |
| 1   | 100  | 300  | NULL | 200  |
pivotCols
The sequence of column names whose values will be used as the output pivot 
column names.
valueCols
The columns whose value will be copied to the pivoted output columns.
baseOutput
The expected base output column names (without the value column prefix). The 
user is required to supply the list of expected pivot column output names to 
avoid and extra action on the input DataFrame just to extract the possible 
pivot columns. if an empty sequence is provided, then the base output columns 
will be extracted from values in the pivot columns (will cause an action on the 
entire DataFrame!)

--
Ali
PS: shoot me an email if you run into any issues using SMV.


On Oct 30, 2015, at 6:33 AM, Andrianasolo Fanilo 
 wrote:

> Hey,
>  
> The question is tricky, here is a possible answer by defining years as keys 
> for a hashmap per client and merging those :
>  
> import scalaz._
> import Scalaz._
>  
> val sc = new SparkContext("local[*]", "sandbox")
> 
> // Create RDD of your objects
> val rdd = sc.parallelize(Seq(
>   ("A", 2015, 4),
>   ("A", 2014, 12),
>   ("A", 2013, 1),
>   ("B", 2015, 24),
>   ("B", 2013, 4)
> ))
> 
> // Search for all the years in the RDD
> val minYear = rdd.map(_._2).reduce(Math.min)// look for minimum year
> val maxYear = rdd.map(_._2).reduce(Math.max)// look for maximum year
> val sequenceOfYears = maxYear to minYear by -1 // create sequence of years 
> from max to min
> 
> // Define functions to build, for each client, a Map of year -> value for 
> year, and how those maps will be merged
> def createCombiner(obj: (Int, Int)): Map[Int, String] = Map(obj._1 -> 
> obj._2.toString)
> def mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum + (obj._1 -> 
> obj._2.toString)
> def mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) = 
> accum1 |+| accum2 // I’m lazy so I use Scalaz to merge two maps of year -> 
> value, I assume we don’t have two lines with same client and year…
> 
> // For each client, check for each year from maxYear to minYear if it exists 
> in the computed map. If not input blank.
> val result = rdd
>   .map { case obj => (obj._1, (obj._2, obj._3)) }
>   .combineByKey(createCombiner, mergeValue, mergeCombiners)
>   .map{ case (name, mapOfYearsToValues) => (Seq(name) ++ 
> sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, " 
> "))).mkString(",")} // here we assume that sequence of all years isn’t too 
> big to not fit in memory. If you had to compute for each day, it may break 
> and you would definitely need to use a specialized timeseries library…
> 
> result.foreach(println)
> 
> sc.stop()
>  
> Best regards,
> Fanilo
>  
> De : Adrian Tanase [mailto:atan...@adobe.com] 
> Envoyé : vendredi 30 octobre 2015 11:50
> À : Deng Ching-Mallete; Ascot Moss
> Cc : User
> Objet : Re: Pivot Data in Spark and Scala
>  
> Its actually a bit tougher as you’ll first need all the years. Also not sure 
> how you would reprsent your “columns” given they are dynamic based on the 
> input data.
>  
> Depending on your downstream processing, I’d probably try to emulate it with 
> a hash map with years as keys instead of the columns.
>  
> There is probably a nicer solution using the data fr

RE: Pivot Data in Spark and Scala

2015-10-30 Thread Andrianasolo Fanilo
Hey,

The question is tricky, here is a possible answer by defining years as keys for 
a hashmap per client and merging those :


import scalaz._
import Scalaz._

val sc = new SparkContext("local[*]", "sandbox")

// Create RDD of your objects
val rdd = sc.parallelize(Seq(
  ("A", 2015, 4),
  ("A", 2014, 12),
  ("A", 2013, 1),
  ("B", 2015, 24),
  ("B", 2013, 4)
))

// Search for all the years in the RDD
val minYear = rdd.map(_._2).reduce(Math.min)// look for minimum year
val maxYear = rdd.map(_._2).reduce(Math.max)// look for maximum year
val sequenceOfYears = maxYear to minYear by -1 // create sequence of years from 
max to min

// Define functions to build, for each client, a Map of year -> value for year, 
and how those maps will be merged
def createCombiner(obj: (Int, Int)): Map[Int, String] = Map(obj._1 -> 
obj._2.toString)
def mergeValue(accum: Map[Int, String], obj: (Int, Int)) = accum + (obj._1 -> 
obj._2.toString)
def mergeCombiners(accum1: Map[Int, String], accum2: Map[Int, String]) = accum1 
|+| accum2 // I’m lazy so I use Scalaz to merge two maps of year -> value, I 
assume we don’t have two lines with same client and year…

// For each client, check for each year from maxYear to minYear if it exists in 
the computed map. If not input blank.
val result = rdd
  .map { case obj => (obj._1, (obj._2, obj._3)) }
  .combineByKey(createCombiner, mergeValue, mergeCombiners)
  .map{ case (name, mapOfYearsToValues) => (Seq(name) ++ 
sequenceOfYears.map(year => mapOfYearsToValues.getOrElse(year, " 
"))).mkString(",")} // here we assume that sequence of all years isn’t too big 
to not fit in memory. If you had to compute for each day, it may break and you 
would definitely need to use a specialized timeseries library…

result.foreach(println)

sc.stop()

Best regards,
Fanilo

De : Adrian Tanase [mailto:atan...@adobe.com]
Envoyé : vendredi 30 octobre 2015 11:50
À : Deng Ching-Mallete; Ascot Moss
Cc : User
Objet : Re: Pivot Data in Spark and Scala

Its actually a bit tougher as you’ll first need all the years. Also not sure 
how you would reprsent your “columns” given they are dynamic based on the input 
data.

Depending on your downstream processing, I’d probably try to emulate it with a 
hash map with years as keys instead of the columns.

There is probably a nicer solution using the data frames API but I’m not 
familiar with it.

If you actually need vectors I think this article I saw recently on the data 
bricks blog will highlight some options (look for gather encoder)
https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html

-adrian

From: Deng Ching-Mallete
Date: Friday, October 30, 2015 at 4:35 AM
To: Ascot Moss
Cc: User
Subject: Re: Pivot Data in Spark and Scala

Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss 
mailto:ascot.m...@gmail.com>> wrote:
Hi,

I have data as follows:

A, 2015, 4
A, 2014, 12
A, 2013, 1
B, 2015, 24
B, 2013 4


I need to convert the data to a new format:
A ,4,12,1
B,   24,,4

Any idea how to make it in Spark Scala?

Thanks





Ce message et les pièces jointes sont confidentiels et réservés à l'usage 
exclusif de ses destinataires. Il peut également être protégé par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant 
être assurée sur Internet, la responsabilité de Worldline ne pourra être 
recherchée quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne 
saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, the Worldline liability cannot be triggered for the 
message content. Although the sender endeavours to maintain a computer 
virus-free network, the sender does not warrant that this transmission is 
virus-free and will not be liable for any damages resulting from any virus 
transmitted.


Re: Pivot Data in Spark and Scala

2015-10-30 Thread Adrian Tanase
Its actually a bit tougher as you’ll first need all the years. Also not sure 
how you would reprsent your “columns” given they are dynamic based on the input 
data.

Depending on your downstream processing, I’d probably try to emulate it with a 
hash map with years as keys instead of the columns.

There is probably a nicer solution using the data frames API but I’m not 
familiar with it.

If you actually need vectors I think this article I saw recently on the data 
bricks blog will highlight some options (look for gather encoder)
https://databricks.com/blog/2015/10/20/audience-modeling-with-spark-ml-pipelines.html

-adrian

From: Deng Ching-Mallete
Date: Friday, October 30, 2015 at 4:35 AM
To: Ascot Moss
Cc: User
Subject: Re: Pivot Data in Spark and Scala

Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss 
mailto:ascot.m...@gmail.com>> wrote:
Hi,

I have data as follows:

A, 2015, 4
A, 2014, 12
A, 2013, 1
B, 2015, 24
B, 2013 4


I need to convert the data to a new format:
A ,4,12,1
B,   24,,4

Any idea how to make it in Spark Scala?

Thanks




Re: Pivot Data in Spark and Scala

2015-10-29 Thread Deng Ching-Mallete
Hi,

You could transform it into a pair RDD then use the combineByKey function.

HTH,
Deng

On Thu, Oct 29, 2015 at 7:29 PM, Ascot Moss  wrote:

> Hi,
>
> I have data as follows:
>
> A, 2015, 4
> A, 2014, 12
> A, 2013, 1
> B, 2015, 24
> B, 2013 4
>
>
> I need to convert the data to a new format:
> A ,4,12,1
> B,   24,,4
>
> Any idea how to make it in Spark Scala?
>
> Thanks
>
>


Pivot Data in Spark and Scala

2015-10-29 Thread Ascot Moss
Hi,

I have data as follows:

A, 2015, 4
A, 2014, 12
A, 2013, 1
B, 2015, 24
B, 2013 4


I need to convert the data to a new format:
A ,4,12,1
B,   24,,4

Any idea how to make it in Spark Scala?

Thanks


Re: spark and scala-2.11

2015-08-24 Thread Lanny Ripple
We're going to be upgrading from spark 1.0.2 and using hadoop-1.2.1 so need
to build by hand.  (Yes, I know. Use hadoop-2.x but standard resource
constraints apply.)  I want to build against scala-2.11 and publish to our
artifact repository but finding build/spark-2.10.4 and tracing down what
build/mvn was doing had me concerned that I was missing something.  I'll
hold the course and build it as instructed.

Thanks for the info, all.

PS - Since asked -- PATH=./build/apache-maven-3.2.5/bin:$PATH; build/mvn
-Phadoop-1 -Dhadoop.version=1.2.1 -Dscala-2.11 -DskipTests package

On Mon, Aug 24, 2015 at 2:49 PM, Jonathan Coveney 
wrote:

> I've used the instructions and it worked fine.
>
> Can you post exactly what you're doing, and what it fails with? Or are you
> just trying to understand how it works?
>
> 2015-08-24 15:48 GMT-04:00 Lanny Ripple :
>
>> Hello,
>>
>> The instructions for building spark against scala-2.11 indicate using
>> -Dspark-2.11.  When I look in the pom.xml I find a profile named
>> 'spark-2.11' but nothing that would indicate I should set a property.  The
>> sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
>> does a simple grep of scala.version (which doesn't change after running dev/
>> change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
>> scala library.
>>
>> Anyone know (from having done it and used it in production) if the build
>> instructions for spark-1.4.1 against Scala-2.11 are correct?
>>
>> Thanks.
>>   -Lanny
>>
>
>


Re: spark and scala-2.11

2015-08-24 Thread Jonathan Coveney
I've used the instructions and it worked fine.

Can you post exactly what you're doing, and what it fails with? Or are you
just trying to understand how it works?

2015-08-24 15:48 GMT-04:00 Lanny Ripple :

> Hello,
>
> The instructions for building spark against scala-2.11 indicate using
> -Dspark-2.11.  When I look in the pom.xml I find a profile named
> 'spark-2.11' but nothing that would indicate I should set a property.  The
> sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
> does a simple grep of scala.version (which doesn't change after running dev/
> change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
> scala library.
>
> Anyone know (from having done it and used it in production) if the build
> instructions for spark-1.4.1 against Scala-2.11 are correct?
>
> Thanks.
>   -Lanny
>


Re: spark and scala-2.11

2015-08-24 Thread Sean Owen
The property "scala-2.11" triggers the profile "scala-2.11" -- and
additionally disables the scala-2.10 profile, so that's the way to do
it. But yes, you also need to run the script before-hand to set up the
build for Scala 2.11 as well.

On Mon, Aug 24, 2015 at 8:48 PM, Lanny Ripple  wrote:
> Hello,
>
> The instructions for building spark against scala-2.11 indicate using
> -Dspark-2.11.  When I look in the pom.xml I find a profile named
> 'spark-2.11' but nothing that would indicate I should set a property.  The
> sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
> does a simple grep of scala.version (which doesn't change after running
> dev/change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
> scala library.
>
> Anyone know (from having done it and used it in production) if the build
> instructions for spark-1.4.1 against Scala-2.11 are correct?
>
> Thanks.
>   -Lanny

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark and scala-2.11

2015-08-24 Thread Lanny Ripple
Hello,

The instructions for building spark against scala-2.11 indicate using
-Dspark-2.11.  When I look in the pom.xml I find a profile named
'spark-2.11' but nothing that would indicate I should set a property.  The
sbt build seems to need the -Dscala-2.11 property set.  Finally build/mvn
does a simple grep of scala.version (which doesn't change after running dev/
change-version-to-2.11.sh) so the build seems to be grabbing the 2.10.4
scala library.

Anyone know (from having done it and used it in production) if the build
instructions for spark-1.4.1 against Scala-2.11 are correct?

Thanks.
  -Lanny


Re: Finding moving average using Spark and Scala

2015-07-17 Thread Anupam Bagchi
Thanks Feynman for your direction.

I was able to solve this problem by calling Spark API from Java.

Here is a code snippet that may help other people who might face the same 
challenge.

if (args.length > 2) {
earliestEventDate = Integer.parseInt(args[2]);
} else {
Date now = Calendar.getInstance().getTime();
SimpleDateFormat dateFormat = new SimpleDateFormat("MMdd");
earliestEventDate = Integer.parseInt(dateFormat.format(new 
Date(now.getTime()-30L*AnalyticsConstants.ONE_DAY_IN_MILLISECONDS)));
}
System.out.println("Filtering out dates earlier than: " + 
earliestEventDate);
JavaRDD logLines = sc.textFile(inputFile);

// Convert the text log lines to DailyDeviceAggregates objects and 
cache them
JavaRDD accessLogs = 
logLines.map(Functions.PARSE_DEVICE_AGGREGATE_LINE).filter(new 
Function() {
@Override
public Boolean call(DailyDeviceAggregates value) {
return (value.getEventdate() >= earliestEventDate);
}
}).cache();
// accessLogs.saveAsTextFile("accessLogs.saved");

JavaPairRDD> groupMap = 
accessLogs.groupBy(new Function() {
@Override
public Object call(DailyDeviceAggregates agg) throws Exception {
return agg.getDevice_id();
}
});
// groupMap.saveAsTextFile("groupedAccessLogs.saved");

JavaPairRDD deviceCharacteristics = 
groupMap.mapValues(new Function, 
DailyDeviceSummary>() {
@Override
public DailyDeviceSummary call(Iterable 
allDeviceDataForMonth) throws Exception {
// First task is to sort the input values by eventdate
ArrayList arr = new 
ArrayList();
for (DailyDeviceAggregates agg: allDeviceDataForMonth) {
arr.add(agg);
}
Collections.sort(arr);
// Done sorting

double bytesTransferred[] = new double[arr.size()];
double bytesIn[] = new double[arr.size()];
double bytesOut[] = new double[arr.size()];

DailyDeviceAggregates lastAggregate = null;
int index = 0;
for (DailyDeviceAggregates aggregate : arr) {
// System.out.println(aggregate);
bytesIn[index] = aggregate.getBytes_in();
bytesOut[index] = aggregate.getBytes_out();
bytesTransferred[index] = aggregate.getBytes_transferred();
index++;
lastAggregate = aggregate;
}
BollingerBands bollingerBytesTransferrred = new 
BollingerBands(bytesTransferred, 30, 2.0);
BollingerBands bollingerBytesIn = new BollingerBands(bytesIn, 
30, 2.0);
BollingerBands bollingerBytesOut = new BollingerBands(bytesOut, 
30, 2.0);

return new DailyDeviceSummary(lastAggregate.getAccount_id(), 
lastAggregate.getDevice_id(), index,
bollingerBytesIn.getLastMean(), 
bollingerBytesOut.getLastMean(), bollingerBytesTransferrred.getLastMean(),
bollingerBytesIn.getLastStandardDeviation(), 
bollingerBytesOut.getLastStandardDeviation(), 
bollingerBytesTransferrred.getLastStandardDeviation(),
(long)bollingerBytesIn.getLastUpperThreshold(), 
(long)bollingerBytesOut.getLastUpperThreshold(), 
(long)bollingerBytesTransferrred.getLastUpperThreshold(),
(long)bollingerBytesIn.getLastLowerThreshold(), 
(long)bollingerBytesOut.getLastLowerThreshold(), 
(long)bollingerBytesTransferrred.getLastLowerThreshold());
}
});

deviceCharacteristics.values().saveAsTextFile(outputFile);

Anupam Bagchi


> On Jul 14, 2015, at 10:21 AM, Feynman Liang  wrote:
> 
> If your rows may have NAs in them, I would process each column individually 
> by first projecting the column ( map(x => x.nameOfColumn) ), filtering out 
> the NAs, then running a summarizer over each column.
> 
> Even if you have many rows, after summarizing you will only have a vector of 
> length #columns.
> 
> On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi  > wrote:
> Hello Feynman,
> 
> Actually in my case, the vectors I am summarizing over will not have the same 
> dimension since many devices will be inactive on some days. This is at best a 
> sparse matrix where we take only the active days and attempt to fit a moving 
> average over it.
> 
> The reason I would like to save it to HDFS is that there are really several 
> million (almost a billion) devices for which this data needs to be written. I 
> am perhaps writing a very few columns, but the number of rows is pretty large.
> 
> Given the above two cases, is using MultivariateOnlineSummarizer not a good 
> idea then?
> 
> Anupam Bagchi
> 
> 
>> On Ju

Re: Finding moving average using Spark and Scala

2015-07-14 Thread Feynman Liang
If your rows may have NAs in them, I would process each column individually
by first projecting the column ( map(x => x.nameOfColumn) ), filtering out
the NAs, then running a summarizer over each column.

Even if you have many rows, after summarizing you will only have a vector
of length #columns.

On Mon, Jul 13, 2015 at 7:19 PM, Anupam Bagchi  wrote:

> Hello Feynman,
>
> Actually in my case, the vectors I am summarizing over will not have the
> same dimension since many devices will be inactive on some days. This is at
> best a sparse matrix where we take only the active days and attempt to fit
> a moving average over it.
>
> The reason I would like to save it to HDFS is that there are really
> several million (almost a billion) devices for which this data needs to be
> written. I am perhaps writing a very few columns, but the number of rows is
> pretty large.
>
> Given the above two cases, is using MultivariateOnlineSummarizer not a
> good idea then?
>
> Anupam Bagchi
>
>
> On Jul 13, 2015, at 7:06 PM, Feynman Liang  wrote:
>
> Dimensions mismatch when adding new sample. Expecting 8 but got 14.
>
> Make sure all the vectors you are summarizing over have the same dimension.
>
> Why would you want to write a MultivariateOnlineSummary object (which can
> be represented with a couple Double's) into a distributed filesystem like
> HDFS?
>
> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <
> anupam_bag...@rocketmail.com> wrote:
>
>> Thank you Feynman for the lead.
>>
>> I was able to modify the code using clues from the RegressionMetrics
>> example. Here is what I got now.
>>
>> val deviceAggregateLogs = 
>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>>
>> // Calculate statistics based on bytes-transferred
>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>> println(deviceIdsMap.collect().deep.mkString("\n"))
>>
>> val summary: MultivariateStatisticalSummary = {
>>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
>> case (deviceId, allaggregates) => Vectors.dense({
>>   val sortedAggregates = allaggregates.toArray
>>   Sorting.quickSort(sortedAggregates)
>>   sortedAggregates.map(dda => dda.bytes.toDouble)
>> })
>>   }.aggregate(new MultivariateOnlineSummarizer())(
>>   (summary, v) => summary.add(v),  // Not sure if this is really what I 
>> want, it just came from the example
>>   (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
>> )
>>   summary
>> }
>>
>> It compiles fine. But I am now getting an exception as follows at Runtime.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent
>> failure: Lost task 1.0 in stage 3.0 (TID 5, localhost):
>> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
>> when adding new sample. Expecting 8 but got 14.
>> at scala.Predef$.require(Predef.scala:233)
>> at
>> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
>> at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>> at
>> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>> at
>> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
>> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
>> at
>> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
>> at
>> scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
>> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
>> at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>> at
>> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:722)
>>
>> Can’t tell where exactly I went wrong. Also, how do I take the
>> MultivariateOnlineSummary objec

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Anupam Bagchi
Hello Feynman,

Actually in my case, the vectors I am summarizing over will not have the same 
dimension since many devices will be inactive on some days. This is at best a 
sparse matrix where we take only the active days and attempt to fit a moving 
average over it.

The reason I would like to save it to HDFS is that there are really several 
million (almost a billion) devices for which this data needs to be written. I 
am perhaps writing a very few columns, but the number of rows is pretty large.

Given the above two cases, is using MultivariateOnlineSummarizer not a good 
idea then?

Anupam Bagchi


> On Jul 13, 2015, at 7:06 PM, Feynman Liang  wrote:
> 
> Dimensions mismatch when adding new sample. Expecting 8 but got 14.
> 
> Make sure all the vectors you are summarizing over have the same dimension.
> 
> Why would you want to write a MultivariateOnlineSummary object (which can be 
> represented with a couple Double's) into a distributed filesystem like HDFS?
> 
> On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi  > wrote:
> Thank you Feynman for the lead.
> 
> I was able to modify the code using clues from the RegressionMetrics example. 
> Here is what I got now.
> 
> val deviceAggregateLogs = 
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
> 
> // Calculate statistics based on bytes-transferred
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
> println(deviceIdsMap.collect().deep.mkString("\n"))
> 
> val summary: MultivariateStatisticalSummary = {
>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
> case (deviceId, allaggregates) => Vectors.dense({
>   val sortedAggregates = allaggregates.toArray
>   Sorting.quickSort(sortedAggregates)
>   sortedAggregates.map(dda => dda.bytes.toDouble)
> })
>   }.aggregate(new MultivariateOnlineSummarizer())(
>   (summary, v) => summary.add(v),  // Not sure if this is really what I 
> want, it just came from the example
>   (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
> )
>   summary
> }
> It compiles fine. But I am now getting an exception as follows at Runtime.
> 
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: 
> Lost task 1.0 in stage 3.0 (TID 5, localhost): 
> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch 
> when adding new sample. Expecting 8 but got 14.
> at scala.Predef$.require(Predef.scala:233)
> at 
> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
> at 
> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
> at 
> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
> at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
> at 
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
> at 
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
> at 
> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
> at 
> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
> 
> Can’t tell where exactly I went wrong. Also, how do I take the 
> MultivariateOnlineSummary object and write it to HDFS? I have the 
> MultivariateOnlineSummary object with me, but I really need an RDD to call 
> saveAsTextFile() on it.
> 
> Anupam Bagchi
> 
> 
>> On Jul 13, 2015, at 4:52 PM, Feynman Liang > > wrote:
>> 
>> A good example is RegressionMetrics 
>> '

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Feynman Liang
Dimensions mismatch when adding new sample. Expecting 8 but got 14.

Make sure all the vectors you are summarizing over have the same dimension.

Why would you want to write a MultivariateOnlineSummary object (which can
be represented with a couple Double's) into a distributed filesystem like
HDFS?

On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi  wrote:

> Thank you Feynman for the lead.
>
> I was able to modify the code using clues from the RegressionMetrics
> example. Here is what I got now.
>
> val deviceAggregateLogs = 
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
> // Calculate statistics based on bytes-transferred
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
> println(deviceIdsMap.collect().deep.mkString("\n"))
>
> val summary: MultivariateStatisticalSummary = {
>   val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
> case (deviceId, allaggregates) => Vectors.dense({
>   val sortedAggregates = allaggregates.toArray
>   Sorting.quickSort(sortedAggregates)
>   sortedAggregates.map(dda => dda.bytes.toDouble)
> })
>   }.aggregate(new MultivariateOnlineSummarizer())(
>   (summary, v) => summary.add(v),  // Not sure if this is really what I 
> want, it just came from the example
>   (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
> )
>   summary
> }
>
> It compiles fine. But I am now getting an exception as follows at Runtime.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent
> failure: Lost task 1.0 in stage 3.0 (TID 5, localhost):
> java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch
> when adding new sample. Expecting 8 but got 14.
> at scala.Predef$.require(Predef.scala:233)
> at
> org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
> at
> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
> at
> com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
> at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
> at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
> at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
> at
> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
> at
> org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:722)
>
> Can’t tell where exactly I went wrong. Also, how do I take the
> MultivariateOnlineSummary object and write it to HDFS? I have the
> MultivariateOnlineSummary object with me, but I really need an RDD to call
> saveAsTextFile() on it.
>
> Anupam Bagchi
> (c) 408.431.0780 (h) 408-873-7909
>
> On Jul 13, 2015, at 4:52 PM, Feynman Liang  wrote:
>
> A good example is RegressionMetrics
> 's
> use of of OnlineMultivariateSummarizer to aggregate statistics across
> labels and residuals; take a look at how aggregateByKey is used there.
>
> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <
> anupam_bag...@rocketmail.com> wrote:
>
>> Thank you Feynman for your response. Since I am very new to Scala I may
>> need a bit more hand-holding at this stage.
>>
>> I have been able to incorporate your suggestion about sorting - and it
>> now works perfectly. Thanks again for that.
>>
>> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
>> could not proceed further. For each deviceid (the key) my goal is to get a
>> vector of doubles on which I can query the mean and standard deviation. Now
>> because RDDs are immutabl

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Anupam Bagchi
Thank you Feynman for the lead.

I was able to modify the code using clues from the RegressionMetrics example. 
Here is what I got now.

val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes-transferred
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
println(deviceIdsMap.collect().deep.mkString("\n"))

val summary: MultivariateStatisticalSummary = {
  val summary: MultivariateStatisticalSummary = deviceIdsMap.map {
case (deviceId, allaggregates) => Vectors.dense({
  val sortedAggregates = allaggregates.toArray
  Sorting.quickSort(sortedAggregates)
  sortedAggregates.map(dda => dda.bytes.toDouble)
})
  }.aggregate(new MultivariateOnlineSummarizer())(
  (summary, v) => summary.add(v),  // Not sure if this is really what I 
want, it just came from the example
  (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well
)
  summary
}
It compiles fine. But I am now getting an exception as follows at Runtime.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost 
task 1.0 in stage 3.0 (TID 5, localhost): java.lang.IllegalArgumentException: 
requirement failed: Dimensions mismatch when adding new sample. Expecting 8 but 
got 14.
at scala.Predef$.require(Predef.scala:233)
at 
org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70)
at 
com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
at 
com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157)
at 
scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966)
at 
org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
at 
org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

Can’t tell where exactly I went wrong. Also, how do I take the 
MultivariateOnlineSummary object and write it to HDFS? I have the 
MultivariateOnlineSummary object with me, but I really need an RDD to call 
saveAsTextFile() on it.

Anupam Bagchi
(c) 408.431.0780 (h) 408-873-7909

> On Jul 13, 2015, at 4:52 PM, Feynman Liang  wrote:
> 
> A good example is RegressionMetrics 
> 's
>  use of of OnlineMultivariateSummarizer to aggregate statistics across labels 
> and residuals; take a look at how aggregateByKey is used there.
> 
> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi  > wrote:
> Thank you Feynman for your response. Since I am very new to Scala I may need 
> a bit more hand-holding at this stage.
> 
> I have been able to incorporate your suggestion about sorting - and it now 
> works perfectly. Thanks again for that.
> 
> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but 
> could not proceed further. For each deviceid (the key) my goal is to get a 
> vector of doubles on which I can query the mean and standard deviation. Now 
> because RDDs are immutable, I cannot use a foreach loop to interate through 
> the groupby results and individually add the values in an RDD - Spark does 
> not allow that. I need to apply the RDD functions directly on the entire set 
> to achieve the transformations I need. This is where I am faltering since I 
> am not used to the lambda expressions that Scala uses.
> 
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("Device Analyzer")
>

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Anupam Bagchi
Thank you Feynman for your response. Since I am very new to Scala I may need a 
bit more hand-holding at this stage.

I have been able to incorporate your suggestion about sorting - and it now 
works perfectly. Thanks again for that.

I tried to use your suggestion of using MultiVariateOnlineSummarizer, but could 
not proceed further. For each deviceid (the key) my goal is to get a vector of 
doubles on which I can query the mean and standard deviation. Now because RDDs 
are immutable, I cannot use a foreach loop to interate through the groupby 
results and individually add the values in an RDD - Spark does not allow that. 
I need to apply the RDD functions directly on the entire set to achieve the 
transformations I need. This is where I am faltering since I am not used to the 
lambda expressions that Scala uses.

object DeviceAnalyzer {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Analyzer")
val sc = new SparkContext(sparkConf)

val logFile = args(0)

val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
// Question: Can we not write the line above as 
deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything 
wrong?
// All I need to do below is collect the vector of bytes for each device 
and store it in the RDD
// The problem with the ‘foreach' approach below, is that it generates the 
vector values one at a time, which I cannot 
// add individually to an immutable RDD
deviceIdsMap.foreach(a => {
  val device_id = a._1  // This is the device ID
  val allaggregates = a._2  // This is an array of all device-aggregates 
for this device

  val sortedaggregates = allaggregates.toArray
  Sorting.quickSort(sortedaggregates)

  val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray 
  val count = byteValues.count(A => true)
  val sum = byteValues.sum
  val xbar = sum / count
  val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
  val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

  val vector: Vector = Vectors.dense(byteValues)
  println(vector)
  println(device_id + "," + xbar + "," + stddev)

})
  //val vector: Vector = Vectors.dense(byteValues)
  //println(vector)
  //val summary: MultivariateStatisticalSummary = 
Statistics.colStats(vector)


sc.stop()
  }
}
Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? Thanks 
a lot for your help.

Anupam Bagchi


> On Jul 13, 2015, at 12:21 PM, Feynman Liang  wrote:
> 
> The call to Sorting.quicksort is not working. Perhaps I am calling it the 
> wrong way.
> allaggregates.toArray allocates and creates a new array separate from 
> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
> val sortedAggregates = allaggregates.toArray
> Sorting.quickSort(sortedAggregates)
> I would like to use the Spark mllib class MultivariateStatisticalSummary to 
> calculate the statistical values.
> MultivariateStatisticalSummary is a trait (similar to a Java interface); you 
> probably want to use MultivariateOnlineSummarizer. 
> For that I would need to keep all my intermediate values as RDD so that I can 
> directly use the RDD methods to do the job.
> Correct; you would do an aggregate using the add and merge functions provided 
> by MultivariateOnlineSummarizer 
> At the end I also need to write the results to HDFS for which there is a 
> method provided on the RDD class to do so, which is another reason I would 
> like to retain everything as RDD.
> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, or 
> you could unpack the relevant statistics from MultivariateOnlineSummarizer 
> into an array/tuple using a mapValues first and then write.
> 
> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi  > wrote:
> I have to do the following tasks on a dataset using Apache Spark with Scala 
> as the programming language:
> Read the dataset from HDFS. A few sample lines look like this:
> deviceid,bytes,eventdate
> 15590657,246620,20150630
> 14066921,1907,20150621
> 14066921,1906,20150626
> 6522013,2349,20150626
> 6522013,2525,20150613
> Group the data by device id. Thus we now have a map of deviceid => 
> (bytes,eventdate)
> For each device, sort the set by eventdate. We now have an ordered set of 
> bytes based on eventdate for each device.
> Pick the last 30 days of bytes from this ordered set.
> Find the moving average of bytes for the last date using a time period of 30.
> Find the standard deviation of the bytes for the final date using a time 
> period of 30.
> Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume 
> k = 3]
> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run 
>

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Feynman Liang
A good example is RegressionMetrics
's
use of of OnlineMultivariateSummarizer to aggregate statistics across
labels and residuals; take a look at how aggregateByKey is used there.

On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi  wrote:

> Thank you Feynman for your response. Since I am very new to Scala I may
> need a bit more hand-holding at this stage.
>
> I have been able to incorporate your suggestion about sorting - and it now
> works perfectly. Thanks again for that.
>
> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but
> could not proceed further. For each deviceid (the key) my goal is to get a
> vector of doubles on which I can query the mean and standard deviation. Now
> because RDDs are immutable, I cannot use a foreach loop to interate through
> the groupby results and individually add the values in an RDD - Spark does
> not allow that. I need to apply the RDD functions directly on the entire
> set to achieve the transformations I need. This is where I am faltering
> since I am not used to the lambda expressions that Scala uses.
>
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("Device Analyzer")
> val sc = new SparkContext(sparkConf)
>
> val logFile = args(0)
>
> val deviceAggregateLogs = 
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
> // Calculate statistics based on bytes
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>
> // Question: Can we not write the line above as 
> deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything 
> wrong?
>
> // All I need to do below is collect the vector of bytes for each device 
> and store it in the RDD
>
> // The problem with the ‘foreach' approach below, is that it generates 
> the vector values one at a time, which I cannot
>
> // add individually to an immutable RDD
>
> deviceIdsMap.foreach(a => {
>   val device_id = a._1  // This is the device ID
>   val allaggregates = a._2  // This is an array of all device-aggregates 
> for this device
>
>   val sortedaggregates = allaggregates.toArray  
> Sorting.quickSort(sortedaggregates)
>
>   val byteValues = sortedaggregates.map(dda => dda.bytes.toDouble).toArray
>   val count = byteValues.count(A => true)
>   val sum = byteValues.sum
>   val xbar = sum / count
>   val sum_x_minus_x_bar_square = byteValues.map(x => 
> (x-xbar)*(x-xbar)).sum
>   val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>
>   val vector: Vector = Vectors.dense(byteValues)
>   println(vector)
>   println(device_id + "," + xbar + "," + stddev)
> })
>
>   //val vector: Vector = Vectors.dense(byteValues)
>   //println(vector)
>   //val summary: MultivariateStatisticalSummary = 
> Statistics.colStats(vector)
>
>
> sc.stop() } }
>
> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way?
> Thanks a lot for your help.
>
> Anupam Bagchi
>
>
> On Jul 13, 2015, at 12:21 PM, Feynman Liang  wrote:
>
> The call to Sorting.quicksort is not working. Perhaps I am calling it the
>> wrong way.
>
> allaggregates.toArray allocates and creates a new array separate from
> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
> val sortedAggregates = allaggregates.toArray
> Sorting.quickSort(sortedAggregates)
>
>> I would like to use the Spark mllib class MultivariateStatisticalSummary
>> to calculate the statistical values.
>
> MultivariateStatisticalSummary is a trait (similar to a Java interface);
> you probably want to use MultivariateOnlineSummarizer.
>
>> For that I would need to keep all my intermediate values as RDD so that I
>> can directly use the RDD methods to do the job.
>
> Correct; you would do an aggregate using the add and merge functions
> provided by MultivariateOnlineSummarizer
>
>> At the end I also need to write the results to HDFS for which there is a
>> method provided on the RDD class to do so, which is another reason I would
>> like to retain everything as RDD.
>
> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
> or you could unpack the relevant statistics from
> MultivariateOnlineSummarizer into an array/tuple using a mapValues first
> and then write.
>
> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
> anupam_bag...@rocketmail.com> wrote:
>
>> I have to do the following tasks on a dataset using Apache Spark with
>> Scala as the programming language:
>>
>>1. Read the dataset from HDFS. A few sample lines look like this:
>>
>>  
>> deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>>
>>
>>1. Group the data by device id. Thus we now have a map of deviceid =>
>>(bytes,eventdate)
>>2. Fo

Re: Finding moving average using Spark and Scala

2015-07-13 Thread Feynman Liang
>
> The call to Sorting.quicksort is not working. Perhaps I am calling it the
> wrong way.

allaggregates.toArray allocates and creates a new array separate from
allaggregates which is sorted by Sorting.quickSort; allaggregates. Try:
val sortedAggregates = allaggregates.toArray
Sorting.quickSort(sortedAggregates)

> I would like to use the Spark mllib class MultivariateStatisticalSummary
> to calculate the statistical values.

MultivariateStatisticalSummary is a trait (similar to a Java interface);
you probably want to use MultivariateOnlineSummarizer.

> For that I would need to keep all my intermediate values as RDD so that I
> can directly use the RDD methods to do the job.

Correct; you would do an aggregate using the add and merge functions
provided by MultivariateOnlineSummarizer

> At the end I also need to write the results to HDFS for which there is a
> method provided on the RDD class to do so, which is another reason I would
> like to retain everything as RDD.

You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS,
or you could unpack the relevant statistics from
MultivariateOnlineSummarizer into an array/tuple using a mapValues first
and then write.

On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi <
anupam_bag...@rocketmail.com> wrote:

> I have to do the following tasks on a dataset using Apache Spark with
> Scala as the programming language:
>
>1. Read the dataset from HDFS. A few sample lines look like this:
>
>  
> deviceid,bytes,eventdate15590657,246620,2015063014066921,1907,2015062114066921,1906,201506266522013,2349,201506266522013,2525,20150613
>
>
>1. Group the data by device id. Thus we now have a map of deviceid =>
>(bytes,eventdate)
>2. For each device, sort the set by eventdate. We now have an ordered
>set of bytes based on eventdate for each device.
>3. Pick the last 30 days of bytes from this ordered set.
>4. Find the moving average of bytes for the last date using a time
>period of 30.
>5. Find the standard deviation of the bytes for the final date using a
>time period of 30.
>6. Return two values in the result (mean - k*stddev) and (mean + k*stddev)
>[Assume k = 3]
>
> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to
> run on a billion rows finally.
> Here is the data structure for the dataset.
>
> package com.testingcase class DeviceAggregates (
> device_id: Integer,
> bytes: Long,
> eventdate: Integer
>) extends Ordered[DailyDeviceAggregates] {
>   def compare(that: DailyDeviceAggregates): Int = {
> eventdate - that.eventdate
>   }}object DeviceAggregates {
>   def parseLogLine(logline: String): DailyDeviceAggregates = {
> val c = logline.split(",")
> DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
>   }}
>
> The DeviceAnalyzer class looks like this:
> I have a very crude implementation that does the job, but it is not up to
> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite
> basic. Here is what I have now:
>
> import com.testing.DailyDeviceAggregatesimport 
> org.apache.spark.{SparkContext, SparkConf}import 
> org.apache.spark.mllib.linalg.Vectorimport 
> org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, 
> Statistics}import org.apache.spark.mllib.linalg.{Vector, Vectors}
> import scala.util.Sorting
> object DeviceAnalyzer {
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("Device Analyzer")
> val sc = new SparkContext(sparkConf)
>
> val logFile = args(0)
>
> val deviceAggregateLogs = 
> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()
>
> // Calculate statistics based on bytes
> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)
>
> deviceIdsMap.foreach(a => {
>   val device_id = a._1  // This is the device ID
>   val allaggregates = a._2  // This is an array of all device-aggregates 
> for this device
>
>   println(allaggregates)
>   Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
> DailyDeviceAggregates based on eventdate
>   println(allaggregates) // This does not work - results are not sorted !!
>
>   val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
>   val count = byteValues.count(A => true)
>   val sum = byteValues.sum
>   val xbar = sum / count
>   val sum_x_minus_x_bar_square = byteValues.map(x => 
> (x-xbar)*(x-xbar)).sum
>   val stddev = math.sqrt(sum_x_minus_x_bar_square / count)
>
>   val vector: Vector = Vectors.dense(byteValues)
>   println(vector)
>   println(device_id + "," + xbar + "," + stddev)
>
>   //val vector: Vector = Vectors.dense(byteValues)
>   //println(vector)
>   //val summary: MultivariateStatisticalSummary = 
> Statistics.colStats(vector)
> })
>
> sc.stop()
>   }}
>
> I would really appreciate if some

Finding moving average using Spark and Scala

2015-07-13 Thread Anupam Bagchi
I have to do the following tasks on a dataset using Apache Spark with Scala as 
the programming language:   
   - Read the dataset from HDFS. A few sample lines look like this:

deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
   
   - Group the data by device id. Thus we now have a map of deviceid => 
(bytes,eventdate)
   - For each device, sort the set by eventdate. We now have an ordered set of 
bytes based on eventdate for each device.
   - Pick the last 30 days of bytes from this ordered set.
   - Find the moving average of bytes for the last date using a time period of 
30.
   - Find the standard deviation of the bytes for the final date using a time 
period of 30.
   - Return two values in the result (mean - kstddev) and (mean + kstddev) 
[Assume k = 3]
I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run 
on a billion rows finally.Here is the data structure for the dataset.package 
com.testing
case class DeviceAggregates (
device_id: Integer,
bytes: Long,
eventdate: Integer
   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
val c = logline.split(",")
DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}The DeviceAnalyzer class looks like this:I have a very crude implementation 
that does the job, but it is not up to the mark. Sorry, I am very new to 
Scala/Spark, so my questions are quite basic. Here is what I have now:
import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Analyzer")
val sc = new SparkContext(sparkConf)

val logFile = args(0)

val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

deviceIdsMap.foreach(a => {
  val device_id = a._1  // This is the device ID
  val allaggregates = a._2  // This is an array of all device-aggregates 
for this device

  println(allaggregates)
  Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
DailyDeviceAggregates based on eventdate
  println(allaggregates) // This does not work - results are not sorted !!

  val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
  val count = byteValues.count(A => true)
  val sum = byteValues.sum
  val xbar = sum / count
  val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
  val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

  val vector: Vector = Vectors.dense(byteValues)
  println(vector)
  println(device_id + "," + xbar + "," + stddev)

  //val vector: Vector = Vectors.dense(byteValues)
  //println(vector)
  //val summary: MultivariateStatisticalSummary = 
Statistics.colStats(vector)
})

sc.stop()
  }
}I would really appreciate if someone can suggests improvements for the 
following:   
   - The call to Sorting.quicksort is not working. Perhaps I am calling it the 
wrong way.
   - I would like to use the Spark mllib class MultivariateStatisticalSummary 
to calculate the statistical values.
   - For that I would need to keep all my intermediate values as RDD so that I 
can directly use the RDD methods to do the job.
   - At the end I also need to write the results to HDFS for which there is a 
method provided on the RDD class to do so, which is another reason I would like 
to retain everything as RDD.

Thanks in advance for your help.
Anupam Bagchi 

Moving average using Spark and Scala

2015-07-11 Thread Anupam Bagchi
I have to do the following tasks on a dataset using Apache Spark with Scala as 
the programming language:

Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
Group the data by device id. Thus we now have a map of deviceid => 
(bytes,eventdate)

For each device, sort the set by eventdate. We now have an ordered set of bytes 
based on eventdate for each device.

Pick the last 30 days of bytes from this ordered set.

Find the moving average of bytes for the last date using a time period of 30.

Find the standard deviation of the bytes for the final date using a time period 
of 30.

Return two values in the result (mean - kstddev) and (mean + kstddev) [Assume k 
= 3]

I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run 
on a billion rows finally.

Here is the data structure for the dataset.

package com.testing
case class DeviceAggregates (
device_id: Integer,
bytes: Long,
eventdate: Integer
   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
val c = logline.split(",")
DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}
The DeviceAnalyzer class looks like this:

I have a very crude implementation that does the job, but it is not up to the 
mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. 
Here is what I have now:

import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Analyzer")
val sc = new SparkContext(sparkConf)

val logFile = args(0)

val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

deviceIdsMap.foreach(a => {
  val device_id = a._1  // This is the device ID
  val allaggregates = a._2  // This is an array of all device-aggregates 
for this device

  println(allaggregates)
  Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
DailyDeviceAggregates based on eventdate
  println(allaggregates) // This does not work - results are not sorted !!

  val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray 
  val count = byteValues.count(A => true)
  val sum = byteValues.sum
  val xbar = sum / count
  val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
  val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

  val vector: Vector = Vectors.dense(byteValues)
  println(vector)
  println(device_id + "," + xbar + "," + stddev)

  //val vector: Vector = Vectors.dense(byteValues)
  //println(vector)
  //val summary: MultivariateStatisticalSummary = 
Statistics.colStats(vector)
})

sc.stop()
  }
}
I would really appreciate if someone can suggests improvements for the 
following:

The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong 
way.
I would like to use the Spark mllib class MultivariateStatisticalSummary to 
calculate the statistical values.
For that I would need to keep all my intermediate values as RDD so that I can 
directly use the RDD methods to do the job.
At the end I also need to write the results to HDFS for which there is a method 
provided on the RDD class to do so, which is another reason I would like to 
retain everything as RDD.

Thanks in advance for your help.

Anupam Bagchi



Calculating moving average of dataset in Apache Spark and Scala

2015-07-11 Thread Anupam Bagchi
I have to do the following tasks on a dataset using Apache Spark with Scala as 
the programming language:

Read the dataset from HDFS. A few sample lines look like this:
deviceid,bytes,eventdate
15590657,246620,20150630
14066921,1907,20150621
14066921,1906,20150626
6522013,2349,20150626
6522013,2525,20150613
Group the data by device id. Thus we now have a map of deviceid => 
(bytes,eventdate)

For each device, sort the set by eventdate. We now have an ordered set of bytes 
based on eventdate for each device.

Pick the last 30 days of bytes from this ordered set.

Find the moving average of bytes for the last date using a time period of 30.

Find the standard deviation of the bytes for the final date using a time period 
of 30.

Return two values in the result (mean - k * stddev) and (mean + k * stddev) 
[Assume k = 3]

I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to run 
on a billion rows finally.


Here is the data structure for the dataset.

package com.testing
case class DeviceAggregates (
device_id: Integer,
bytes: Long,
eventdate: Integer
   ) extends Ordered[DailyDeviceAggregates] {
  def compare(that: DailyDeviceAggregates): Int = {
eventdate - that.eventdate
  }
}
object DeviceAggregates {
  def parseLogLine(logline: String): DailyDeviceAggregates = {
val c = logline.split(",")
DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt)
  }
}

I have a very crude implementation that does the job, but it is not up to the 
mark. Sorry, I am very new to Scala/Spark, so my questions are quite basic. 
Here is what I have now:

import com.testing.DailyDeviceAggregates
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.{Vector, Vectors}

import scala.util.Sorting

object DeviceAnalyzer {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Device Analyzer")
val sc = new SparkContext(sparkConf)

val logFile = args(0)

val deviceAggregateLogs = 
sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache()

// Calculate statistics based on bytes
val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id)

deviceIdsMap.foreach(a => {
  val device_id = a._1  // This is the device ID
  val allaggregates = a._2  // This is an array of all device-aggregates 
for this device

  println(allaggregates)
  Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of 
DailyDeviceAggregates based on eventdate
  println(allAggregates) // This does not work - I can see that values are 
not sorted

  val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray
  val count = byteValues.count(A => true)
  val sum = byteValues.sum
  val xbar = sum / count
  val sum_x_minus_x_bar_square = byteValues.map(x => (x-xbar)*(x-xbar)).sum
  val stddev = math.sqrt(sum_x_minus_x_bar_square / count)

  val vector: Vector = Vectors.dense(byteValues)
  println(vector)
  println(device_id + "," + xbar + "," + stddev)

  //val vector: Vector = Vectors.dense(byteValues)
  //println(vector)
  //val summary: MultivariateStatisticalSummary = 
Statistics.colStats(vector)  // The way I should be calling it
})

sc.stop()
  }
}

I would really appreciate if someone can suggests improvements for the 
following:

The call to Sorting.quicksort is not working. Perhaps I am calling it the wrong 
way.
I would like to use the Spark mllib class MultivariateStatisticalSummary to 
calculate the statistical values.
For that I would need to keep all my intermediate values as RDD so that I can 
directly use the RDD methods to do the job.
At the end I also need to write the results to HDFS for which there is a method 
provided on the RDD class to do so, which is another reason I would like to 
retain everything as RDD.


Anupam Bagchi




Re: Spark and Scala

2014-09-13 Thread Mark Hamstra
Sorry, posting too late at night.  That should be "...transformations, that
produce further RDDs; and actions, that return values to the driver
program."

On Sat, Sep 13, 2014 at 12:45 AM, Mark Hamstra 
wrote:

> Again, RDD operations are of two basic varieties: transformations, that
> produce further RDDs; and operations, that return values to the driver
> program.  You've used several RDD transformations and then finally the
> top(1) action, which returns an array of one element to your driver
> program.  That is exactly what you should expect from the description of
> RDD#top in the API.
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
>
> On Sat, Sep 13, 2014 at 12:34 AM, Deep Pradhan 
> wrote:
>
>> Take for example this:
>>
>>
>> *val lines = sc.textFile(args(0))*
>> *val nodes = lines.map(s =>{  *
>> *val fields = s.split("\\s+")*
>> *(fields(0),fields(1))*
>> *}).distinct().groupByKey().cache() *
>>
>> *val nodeSizeTuple = nodes.map(node => (node._1.toInt, node._2.size))*
>> *val rootNode = nodeSizeTuple.top(1)(Ordering.by(f => f._2))*
>>
>> The nodeSizeTuple is an RDD,but rootNode is an array. Here I have used
>> all RDD operations, but I am getting an array.
>> What about this case?
>>
>> On Sat, Sep 13, 2014 at 11:45 AM, Deep Pradhan > > wrote:
>>
>>> Is it always true that whenever we apply operations on an RDD, we get
>>> another RDD?
>>> Or does it depend on the return type of the operation?
>>>
>>> On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta <
>>> soumya.sima...@gmail.com> wrote:
>>>

 An RDD is a fault-tolerant distributed structure. It is the primary
 abstraction in Spark.

 I would strongly suggest that you have a look at the following to get a
 basic idea.

 http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html
 http://spark.apache.org/docs/latest/quick-start.html#basics

 https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia

 On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan <
 pradhandeep1...@gmail.com> wrote:

> Take for example this:
> I have declared one queue *val queue = Queue.empty[Int]*, which is a
> pure scala line in the program. I actually want the queue to be an RDD but
> there are no direct methods to create RDD which is a queue right? What say
> do you have on this?
> Does there exist something like: *Create and RDD which is a queue *?
>
> On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan <
> hshreedha...@cloudera.com> wrote:
>
>> No, Scala primitives remain primitives. Unless you create an RDD
>> using one of the many methods - you would not be able to access any of 
>> the
>> RDD methods. There is no automatic porting. Spark is an application as 
>> far
>> as scala is concerned - there is no compilation (except of course, the
>> scala, JIT compilation etc).
>>
>> On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan <
>> pradhandeep1...@gmail.com> wrote:
>>
>>> I know that unpersist is a method on RDD.
>>> But my confusion is that, when we port our Scala programs to Spark,
>>> doesn't everything change to RDDs?
>>>
>>> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 unpersist is a method on RDDs. RDDs are abstractions introduced by
 Spark.

 An Int is just a Scala Int. You can't call unpersist on Int in
 Scala, and that doesn't change in Spark.

 On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan <
 pradhandeep1...@gmail.com> wrote:

> There is one thing that I am confused about.
> Spark has codes that have been implemented in Scala. Now, can we
> run any Scala code on the Spark framework? What will be the 
> difference in
> the execution of the scala code in normal systems and on Spark?
> The reason for my question is the following:
> I had a variable
> *val temp = *
> This temp was being created inside the loop, so as to manually
> throw it out of the cache, every time the loop ends I was calling
> *temp.unpersist()*, this was returning an error saying that *value
> unpersist is not a method of Int*, which means that temp is an
> Int.
> Can some one explain to me why I was not able to call *unpersist*
> on *temp*?
>
> Thank You
>


>>>
>>
>

>>>
>>
>


Re: Spark and Scala

2014-09-13 Thread Mark Hamstra
Again, RDD operations are of two basic varieties: transformations, that
produce further RDDs; and operations, that return values to the driver
program.  You've used several RDD transformations and then finally the
top(1) action, which returns an array of one element to your driver
program.  That is exactly what you should expect from the description of
RDD#top in the API.
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD

On Sat, Sep 13, 2014 at 12:34 AM, Deep Pradhan 
wrote:

> Take for example this:
>
>
> *val lines = sc.textFile(args(0))*
> *val nodes = lines.map(s =>{  *
> *val fields = s.split("\\s+")*
> *(fields(0),fields(1))*
> *}).distinct().groupByKey().cache() *
>
> *val nodeSizeTuple = nodes.map(node => (node._1.toInt, node._2.size))*
> *val rootNode = nodeSizeTuple.top(1)(Ordering.by(f => f._2))*
>
> The nodeSizeTuple is an RDD,but rootNode is an array. Here I have used all
> RDD operations, but I am getting an array.
> What about this case?
>
> On Sat, Sep 13, 2014 at 11:45 AM, Deep Pradhan 
> wrote:
>
>> Is it always true that whenever we apply operations on an RDD, we get
>> another RDD?
>> Or does it depend on the return type of the operation?
>>
>> On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta > > wrote:
>>
>>>
>>> An RDD is a fault-tolerant distributed structure. It is the primary
>>> abstraction in Spark.
>>>
>>> I would strongly suggest that you have a look at the following to get a
>>> basic idea.
>>>
>>> http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html
>>> http://spark.apache.org/docs/latest/quick-start.html#basics
>>>
>>> https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia
>>>
>>> On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan <
>>> pradhandeep1...@gmail.com> wrote:
>>>
 Take for example this:
 I have declared one queue *val queue = Queue.empty[Int]*, which is a
 pure scala line in the program. I actually want the queue to be an RDD but
 there are no direct methods to create RDD which is a queue right? What say
 do you have on this?
 Does there exist something like: *Create and RDD which is a queue *?

 On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan <
 hshreedha...@cloudera.com> wrote:

> No, Scala primitives remain primitives. Unless you create an RDD using
> one of the many methods - you would not be able to access any of the RDD
> methods. There is no automatic porting. Spark is an application as far as
> scala is concerned - there is no compilation (except of course, the scala,
> JIT compilation etc).
>
> On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan <
> pradhandeep1...@gmail.com> wrote:
>
>> I know that unpersist is a method on RDD.
>> But my confusion is that, when we port our Scala programs to Spark,
>> doesn't everything change to RDDs?
>>
>> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> unpersist is a method on RDDs. RDDs are abstractions introduced by
>>> Spark.
>>>
>>> An Int is just a Scala Int. You can't call unpersist on Int in
>>> Scala, and that doesn't change in Spark.
>>>
>>> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan <
>>> pradhandeep1...@gmail.com> wrote:
>>>
 There is one thing that I am confused about.
 Spark has codes that have been implemented in Scala. Now, can we
 run any Scala code on the Spark framework? What will be the difference 
 in
 the execution of the scala code in normal systems and on Spark?
 The reason for my question is the following:
 I had a variable
 *val temp = *
 This temp was being created inside the loop, so as to manually
 throw it out of the cache, every time the loop ends I was calling
 *temp.unpersist()*, this was returning an error saying that *value
 unpersist is not a method of Int*, which means that temp is an
 Int.
 Can some one explain to me why I was not able to call *unpersist*
 on *temp*?

 Thank You

>>>
>>>
>>
>

>>>
>>
>


Re: Spark and Scala

2014-09-13 Thread Deep Pradhan
Take for example this:


*val lines = sc.textFile(args(0))*
*val nodes = lines.map(s =>{  *
*val fields = s.split("\\s+")*
*(fields(0),fields(1))*
*}).distinct().groupByKey().cache() *

*val nodeSizeTuple = nodes.map(node => (node._1.toInt, node._2.size))*
*val rootNode = nodeSizeTuple.top(1)(Ordering.by(f => f._2))*

The nodeSizeTuple is an RDD,but rootNode is an array. Here I have used all
RDD operations, but I am getting an array.
What about this case?

On Sat, Sep 13, 2014 at 11:45 AM, Deep Pradhan 
wrote:

> Is it always true that whenever we apply operations on an RDD, we get
> another RDD?
> Or does it depend on the return type of the operation?
>
> On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta 
> wrote:
>
>>
>> An RDD is a fault-tolerant distributed structure. It is the primary
>> abstraction in Spark.
>>
>> I would strongly suggest that you have a look at the following to get a
>> basic idea.
>>
>> http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html
>> http://spark.apache.org/docs/latest/quick-start.html#basics
>>
>> https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia
>>
>> On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan > > wrote:
>>
>>> Take for example this:
>>> I have declared one queue *val queue = Queue.empty[Int]*, which is a
>>> pure scala line in the program. I actually want the queue to be an RDD but
>>> there are no direct methods to create RDD which is a queue right? What say
>>> do you have on this?
>>> Does there exist something like: *Create and RDD which is a queue *?
>>>
>>> On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan <
>>> hshreedha...@cloudera.com> wrote:
>>>
 No, Scala primitives remain primitives. Unless you create an RDD using
 one of the many methods - you would not be able to access any of the RDD
 methods. There is no automatic porting. Spark is an application as far as
 scala is concerned - there is no compilation (except of course, the scala,
 JIT compilation etc).

 On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan <
 pradhandeep1...@gmail.com> wrote:

> I know that unpersist is a method on RDD.
> But my confusion is that, when we port our Scala programs to Spark,
> doesn't everything change to RDDs?
>
> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> unpersist is a method on RDDs. RDDs are abstractions introduced by
>> Spark.
>>
>> An Int is just a Scala Int. You can't call unpersist on Int in Scala,
>> and that doesn't change in Spark.
>>
>> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan <
>> pradhandeep1...@gmail.com> wrote:
>>
>>> There is one thing that I am confused about.
>>> Spark has codes that have been implemented in Scala. Now, can we run
>>> any Scala code on the Spark framework? What will be the difference in 
>>> the
>>> execution of the scala code in normal systems and on Spark?
>>> The reason for my question is the following:
>>> I had a variable
>>> *val temp = *
>>> This temp was being created inside the loop, so as to manually throw
>>> it out of the cache, every time the loop ends I was calling
>>> *temp.unpersist()*, this was returning an error saying that *value
>>> unpersist is not a method of Int*, which means that temp is an Int.
>>> Can some one explain to me why I was not able to call *unpersist*
>>> on *temp*?
>>>
>>> Thank You
>>>
>>
>>
>

>>>
>>
>


Re: Spark and Scala

2014-09-13 Thread Mark Hamstra
This is all covered in
http://spark.apache.org/docs/latest/programming-guide.html#rdd-operations

By definition, RDD transformations take an RDD to another RDD; actions
produce some other type as a value on the driver program.

On Fri, Sep 12, 2014 at 11:15 PM, Deep Pradhan 
wrote:

> Is it always true that whenever we apply operations on an RDD, we get
> another RDD?
> Or does it depend on the return type of the operation?
>
> On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta 
> wrote:
>
>>
>> An RDD is a fault-tolerant distributed structure. It is the primary
>> abstraction in Spark.
>>
>> I would strongly suggest that you have a look at the following to get a
>> basic idea.
>>
>> http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html
>> http://spark.apache.org/docs/latest/quick-start.html#basics
>>
>> https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia
>>
>> On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan > > wrote:
>>
>>> Take for example this:
>>> I have declared one queue *val queue = Queue.empty[Int]*, which is a
>>> pure scala line in the program. I actually want the queue to be an RDD but
>>> there are no direct methods to create RDD which is a queue right? What say
>>> do you have on this?
>>> Does there exist something like: *Create and RDD which is a queue *?
>>>
>>> On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan <
>>> hshreedha...@cloudera.com> wrote:
>>>
 No, Scala primitives remain primitives. Unless you create an RDD using
 one of the many methods - you would not be able to access any of the RDD
 methods. There is no automatic porting. Spark is an application as far as
 scala is concerned - there is no compilation (except of course, the scala,
 JIT compilation etc).

 On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan <
 pradhandeep1...@gmail.com> wrote:

> I know that unpersist is a method on RDD.
> But my confusion is that, when we port our Scala programs to Spark,
> doesn't everything change to RDDs?
>
> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> unpersist is a method on RDDs. RDDs are abstractions introduced by
>> Spark.
>>
>> An Int is just a Scala Int. You can't call unpersist on Int in Scala,
>> and that doesn't change in Spark.
>>
>> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan <
>> pradhandeep1...@gmail.com> wrote:
>>
>>> There is one thing that I am confused about.
>>> Spark has codes that have been implemented in Scala. Now, can we run
>>> any Scala code on the Spark framework? What will be the difference in 
>>> the
>>> execution of the scala code in normal systems and on Spark?
>>> The reason for my question is the following:
>>> I had a variable
>>> *val temp = *
>>> This temp was being created inside the loop, so as to manually throw
>>> it out of the cache, every time the loop ends I was calling
>>> *temp.unpersist()*, this was returning an error saying that *value
>>> unpersist is not a method of Int*, which means that temp is an Int.
>>> Can some one explain to me why I was not able to call *unpersist*
>>> on *temp*?
>>>
>>> Thank You
>>>
>>
>>
>

>>>
>>
>


Re: Spark and Scala

2014-09-12 Thread Deep Pradhan
Is it always true that whenever we apply operations on an RDD, we get
another RDD?
Or does it depend on the return type of the operation?

On Sat, Sep 13, 2014 at 9:45 AM, Soumya Simanta 
wrote:

>
> An RDD is a fault-tolerant distributed structure. It is the primary
> abstraction in Spark.
>
> I would strongly suggest that you have a look at the following to get a
> basic idea.
>
> http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html
> http://spark.apache.org/docs/latest/quick-start.html#basics
>
> https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia
>
> On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan 
> wrote:
>
>> Take for example this:
>> I have declared one queue *val queue = Queue.empty[Int]*, which is a
>> pure scala line in the program. I actually want the queue to be an RDD but
>> there are no direct methods to create RDD which is a queue right? What say
>> do you have on this?
>> Does there exist something like: *Create and RDD which is a queue *?
>>
>> On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan <
>> hshreedha...@cloudera.com> wrote:
>>
>>> No, Scala primitives remain primitives. Unless you create an RDD using
>>> one of the many methods - you would not be able to access any of the RDD
>>> methods. There is no automatic porting. Spark is an application as far as
>>> scala is concerned - there is no compilation (except of course, the scala,
>>> JIT compilation etc).
>>>
>>> On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan >> > wrote:
>>>
 I know that unpersist is a method on RDD.
 But my confusion is that, when we port our Scala programs to Spark,
 doesn't everything change to RDDs?

 On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
 nicholas.cham...@gmail.com> wrote:

> unpersist is a method on RDDs. RDDs are abstractions introduced by
> Spark.
>
> An Int is just a Scala Int. You can't call unpersist on Int in Scala,
> and that doesn't change in Spark.
>
> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan <
> pradhandeep1...@gmail.com> wrote:
>
>> There is one thing that I am confused about.
>> Spark has codes that have been implemented in Scala. Now, can we run
>> any Scala code on the Spark framework? What will be the difference in the
>> execution of the scala code in normal systems and on Spark?
>> The reason for my question is the following:
>> I had a variable
>> *val temp = *
>> This temp was being created inside the loop, so as to manually throw
>> it out of the cache, every time the loop ends I was calling
>> *temp.unpersist()*, this was returning an error saying that *value
>> unpersist is not a method of Int*, which means that temp is an Int.
>> Can some one explain to me why I was not able to call *unpersist* on
>> *temp*?
>>
>> Thank You
>>
>
>

>>>
>>
>


Re: Spark and Scala

2014-09-12 Thread Soumya Simanta
An RDD is a fault-tolerant distributed structure. It is the primary
abstraction in Spark.

I would strongly suggest that you have a look at the following to get a
basic idea.

http://www.cs.berkeley.edu/~pwendell/strataconf/api/core/spark/RDD.html
http://spark.apache.org/docs/latest/quick-start.html#basics
https://www.usenix.org/conference/nsdi12/technical-sessions/presentation/zaharia

On Sat, Sep 13, 2014 at 12:06 AM, Deep Pradhan 
wrote:

> Take for example this:
> I have declared one queue *val queue = Queue.empty[Int]*, which is a pure
> scala line in the program. I actually want the queue to be an RDD but there
> are no direct methods to create RDD which is a queue right? What say do you
> have on this?
> Does there exist something like: *Create and RDD which is a queue *?
>
> On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan <
> hshreedha...@cloudera.com> wrote:
>
>> No, Scala primitives remain primitives. Unless you create an RDD using
>> one of the many methods - you would not be able to access any of the RDD
>> methods. There is no automatic porting. Spark is an application as far as
>> scala is concerned - there is no compilation (except of course, the scala,
>> JIT compilation etc).
>>
>> On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan 
>> wrote:
>>
>>> I know that unpersist is a method on RDD.
>>> But my confusion is that, when we port our Scala programs to Spark,
>>> doesn't everything change to RDDs?
>>>
>>> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
>>> nicholas.cham...@gmail.com> wrote:
>>>
 unpersist is a method on RDDs. RDDs are abstractions introduced by
 Spark.

 An Int is just a Scala Int. You can't call unpersist on Int in Scala,
 and that doesn't change in Spark.

 On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan <
 pradhandeep1...@gmail.com> wrote:

> There is one thing that I am confused about.
> Spark has codes that have been implemented in Scala. Now, can we run
> any Scala code on the Spark framework? What will be the difference in the
> execution of the scala code in normal systems and on Spark?
> The reason for my question is the following:
> I had a variable
> *val temp = *
> This temp was being created inside the loop, so as to manually throw
> it out of the cache, every time the loop ends I was calling
> *temp.unpersist()*, this was returning an error saying that *value
> unpersist is not a method of Int*, which means that temp is an Int.
> Can some one explain to me why I was not able to call *unpersist* on
> *temp*?
>
> Thank You
>


>>>
>>
>


Re: Spark and Scala

2014-09-12 Thread Deep Pradhan
Take for example this:
I have declared one queue *val queue = Queue.empty[Int]*, which is a pure
scala line in the program. I actually want the queue to be an RDD but there
are no direct methods to create RDD which is a queue right? What say do you
have on this?
Does there exist something like: *Create and RDD which is a queue *?

On Sat, Sep 13, 2014 at 8:43 AM, Hari Shreedharan  wrote:

> No, Scala primitives remain primitives. Unless you create an RDD using one
> of the many methods - you would not be able to access any of the RDD
> methods. There is no automatic porting. Spark is an application as far as
> scala is concerned - there is no compilation (except of course, the scala,
> JIT compilation etc).
>
> On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan 
> wrote:
>
>> I know that unpersist is a method on RDD.
>> But my confusion is that, when we port our Scala programs to Spark,
>> doesn't everything change to RDDs?
>>
>> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
>> nicholas.cham...@gmail.com> wrote:
>>
>>> unpersist is a method on RDDs. RDDs are abstractions introduced by Spark.
>>>
>>> An Int is just a Scala Int. You can't call unpersist on Int in Scala,
>>> and that doesn't change in Spark.
>>>
>>> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan <
>>> pradhandeep1...@gmail.com> wrote:
>>>
 There is one thing that I am confused about.
 Spark has codes that have been implemented in Scala. Now, can we run
 any Scala code on the Spark framework? What will be the difference in the
 execution of the scala code in normal systems and on Spark?
 The reason for my question is the following:
 I had a variable
 *val temp = *
 This temp was being created inside the loop, so as to manually throw it
 out of the cache, every time the loop ends I was calling
 *temp.unpersist()*, this was returning an error saying that *value
 unpersist is not a method of Int*, which means that temp is an Int.
 Can some one explain to me why I was not able to call *unpersist* on
 *temp*?

 Thank You

>>>
>>>
>>
>


Re: Spark and Scala

2014-09-12 Thread Hari Shreedharan
No, Scala primitives remain primitives. Unless you create an RDD using one
of the many methods - you would not be able to access any of the RDD
methods. There is no automatic porting. Spark is an application as far as
scala is concerned - there is no compilation (except of course, the scala,
JIT compilation etc).

On Fri, Sep 12, 2014 at 8:04 PM, Deep Pradhan 
wrote:

> I know that unpersist is a method on RDD.
> But my confusion is that, when we port our Scala programs to Spark,
> doesn't everything change to RDDs?
>
> On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> unpersist is a method on RDDs. RDDs are abstractions introduced by Spark.
>>
>> An Int is just a Scala Int. You can't call unpersist on Int in Scala, and
>> that doesn't change in Spark.
>>
>> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan > > wrote:
>>
>>> There is one thing that I am confused about.
>>> Spark has codes that have been implemented in Scala. Now, can we run any
>>> Scala code on the Spark framework? What will be the difference in the
>>> execution of the scala code in normal systems and on Spark?
>>> The reason for my question is the following:
>>> I had a variable
>>> *val temp = *
>>> This temp was being created inside the loop, so as to manually throw it
>>> out of the cache, every time the loop ends I was calling
>>> *temp.unpersist()*, this was returning an error saying that *value
>>> unpersist is not a method of Int*, which means that temp is an Int.
>>> Can some one explain to me why I was not able to call *unpersist* on
>>> *temp*?
>>>
>>> Thank You
>>>
>>
>>
>


Re: Spark and Scala

2014-09-12 Thread Deep Pradhan
I know that unpersist is a method on RDD.
But my confusion is that, when we port our Scala programs to Spark, doesn't
everything change to RDDs?

On Fri, Sep 12, 2014 at 10:16 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> unpersist is a method on RDDs. RDDs are abstractions introduced by Spark.
>
> An Int is just a Scala Int. You can't call unpersist on Int in Scala, and
> that doesn't change in Spark.
>
> On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan 
> wrote:
>
>> There is one thing that I am confused about.
>> Spark has codes that have been implemented in Scala. Now, can we run any
>> Scala code on the Spark framework? What will be the difference in the
>> execution of the scala code in normal systems and on Spark?
>> The reason for my question is the following:
>> I had a variable
>> *val temp = *
>> This temp was being created inside the loop, so as to manually throw it
>> out of the cache, every time the loop ends I was calling
>> *temp.unpersist()*, this was returning an error saying that *value
>> unpersist is not a method of Int*, which means that temp is an Int.
>> Can some one explain to me why I was not able to call *unpersist* on
>> *temp*?
>>
>> Thank You
>>
>
>


Re: Spark and Scala

2014-09-12 Thread Nicholas Chammas
unpersist is a method on RDDs. RDDs are abstractions introduced by Spark.

An Int is just a Scala Int. You can't call unpersist on Int in Scala, and
that doesn't change in Spark.

On Fri, Sep 12, 2014 at 12:33 PM, Deep Pradhan 
wrote:

> There is one thing that I am confused about.
> Spark has codes that have been implemented in Scala. Now, can we run any
> Scala code on the Spark framework? What will be the difference in the
> execution of the scala code in normal systems and on Spark?
> The reason for my question is the following:
> I had a variable
> *val temp = *
> This temp was being created inside the loop, so as to manually throw it
> out of the cache, every time the loop ends I was calling
> *temp.unpersist()*, this was returning an error saying that *value
> unpersist is not a method of Int*, which means that temp is an Int.
> Can some one explain to me why I was not able to call *unpersist* on
> *temp*?
>
> Thank You
>


Spark and Scala

2014-09-12 Thread Deep Pradhan
There is one thing that I am confused about.
Spark has codes that have been implemented in Scala. Now, can we run any
Scala code on the Spark framework? What will be the difference in the
execution of the scala code in normal systems and on Spark?
The reason for my question is the following:
I had a variable
*val temp = *
This temp was being created inside the loop, so as to manually throw it out
of the cache, every time the loop ends I was calling *temp.unpersist()*,
this was returning an error saying that *value unpersist is not a method of
Int*, which means that temp is an Int.
Can some one explain to me why I was not able to call *unpersist* on *temp*?

Thank You