Strange behavior of collectNeighbors API in GraphX

2016-03-11 Thread Zhaokang Wang

Hi all,

These days I have met a problem of GraphX’s strange behavior on 
|collectNeighbors| API. It seems that this API has side-effects on the 
Pregel API. It makes Pregel API not work as expected. The following is a 
small code demo to reproduce this strange behavior. You can get the 
whole source code in the attachment.


The steps to reproduce the side-effects:

1. Create a little toy graph with a simple vertex property type:

   |class VertexProperty(val inNeighbor:ArrayBuffer[Long] =
   ArrayBuffer[Long]()) extends Serializable { } // Create a data
   graph. Vertices:1,2,3; Edges:2 -> 1, 3 -> 1, 2 -> 3. .. val
   purGraph = Graph(dataVertex, dataEdge).persist() |

2.

   Call |collectNeighbors| method to get both inNeighbor graph and
   outNeighbor graph of the |purGraph|. Then outer join the
   |inNeighborGraph| with |purGraph| to get the |dataGraph|:

   |// Get inNeighbor and outNeighbor graph from purGraph val
   inNeighborGraph = purGraph.collectNeighbors(EdgeDirection.In) //
   !!May be BUG here!! If we don't collect outneighbors here, the bug
   will disappear. val outNeighborGraph =
   purGraph.collectNeighbors(EdgeDirection.Out) // Now join the in
   neighbor vertex id list to every vertex's property val dataGraph =
   purGraph.outerJoinVertices(inNeighborGraph)((vid, property,
   inNeighborList) => { val inNeighborVertexIds =
   inNeighborList.getOrElse(Array[(VertexId, VertexProperty)]()).map(t
   => t._1) property.inNeighbor ++= inNeighborVertexIds.toBuffer
   property }) |

3.

   Conduct a simple Pregel computation on |dataGraph|. In the Pregel
   vertex program phase, we do nothing but just print some debug info.
   However, in the send message phase, we find that the |inNeighbor|
   property of vertex 1 has changed! *The |inNeighbor| property values
   of vertex 1 are inconsistent between the vertex program phase and
   the send message phase!*

   |val result = dataGraph.pregel(Array[Long](), maxIterations = 1,
   EdgeDirection.Both)( // vertex program (id, property, msg) =>
   vertexProgram(id, property, msg), // send messages triplet =>
   sendMessage(triplet), // combine messages (a, b) =>
   messageCombiner(a, b) ) // In the vertex program, we change
   nothing... def vertexProgram(id: VertexId, property: VertexProperty,
   msgSum: Array[Long]):VertexProperty = { if(id == 1L)
   println("[Vertex Program]Vertex 1's inNeighbor property length is:"
   + property.inNeighbor.length) property } // In the send message
   phase, we just check the vertex property of the same vertex. // We
   should get the same results in the two phases. def
   sendMessage(edge:EdgeTriplet[VertexProperty,
   Null]):Iterator[(VertexId, Array[Long])]={ // Print vertex 1's
   inNeighbor length if(edge.srcId == 1L) println("[Send Message]
   Vertex 1's inNeighbor property length is:" +
   edge.srcAttr.inNeighbor.length) if(edge.dstId == 1L) println("[Send
   Message] Vertex 1's inNeighbor property length is:" +
   edge.dstAttr.inNeighbor.length) val sIn = edge.srcAttr.inNeighbor
   val dIn = edge.dstAttr.inNeighbor //send empty message
   ArrayBuffer[(VertexId,Array[Long])]().toIterator } def
   messageCombiner(a:Array[Long], b:Array[Long]):Array[Long]={
   Array.concat(a,b) } |

   In the program output, we get:

   |[Vertex Program]Vertex 1's inNeighbor property length is:2 [Send
   Message] Vertex 1's inNeighbor property length is:0 [Send Message]
   Vertex 1's inNeighbor property length is:0 |

4.

   More weirdly, if we comment out one of the |collectNeighbors| method
   call, everything will be OK! As you may notice, actually we do not
   use |outNeighborGraph| in our program, so we can comment the
   following statement in the program:

   |// val outNeighborGraph = purGraph.collectNeighbors(EdgeDirection.Out) |

   If we comment that statement out, you can find that everything is
   Okay now.

   |[Vertex Program]Vertex 1's inNeighbor property length is:2 [Send
   Message] Vertex 1's inNeighbor property length is:2 [Send Message]
   Vertex 1's inNeighbor property length is:2 |

The behavior of |collectNeighbors| is strange. Maybe it is a bug of 
GraphX or I call this API improperly or my vertex property is improper. 
Could you please give some comments on this? Thanks a lot.


Regards,

Zhaokang Wang

PS. We have tested the code on Spark 1.5.1 and 1.6.0.

​
import org.apache.spark.{graphx, SparkConf, SparkContext}
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.EdgeDirection
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer


class VertexProperty(val inNeighbor:ArrayBuffer[Long] = ArrayBuffer[Long]()) 
extends Serializable { }

object Main {

  def main(args: Array[String]): Unit = {
// begin Spark job
val conf = new SparkConf().setAppName("finding pregel API 
bug").setMaster("local[2]")
val sc = new SparkContext(conf)


// Create a data graph. Vertices:1,2,3; Edges

Re: Installing Spark on Mac

2016-03-11 Thread Jakob Odersky
Some more diagnostics/suggestions:

1) are other services listening to ports in the 4000 range (run
"netstat -plunt")? Maybe there is an issue with the error message
itself.

2) are you sure the correct java version is used? java -version

3) can you revert all installation attempts you have done so far,
including files installed by brew/macports or maven and try again?

4) are there any special firewall rules in place, forbidding
connections on localhost?

This is very weird behavior you're seeing. Spark is supposed to work
out-of-the-box with ZERO configuration necessary for running a local
shell. Again, my prime suspect is a previous, failed Spark
installation messing up your config.

On Thu, Mar 10, 2016 at 12:24 PM, Tristan Nixon  wrote:
> If you type ‘whoami’ in the terminal, and it responds with ‘root’ then you’re 
> the superuser.
> However, as mentioned below, I don’t think its a relevant factor.
>
>> On Mar 10, 2016, at 12:02 PM, Aida Tefera  wrote:
>>
>> Hi Tristan,
>>
>> I'm afraid I wouldn't know whether I'm running it as super user.
>>
>> I have java version 1.8.0_73 and SCALA version 2.11.7
>>
>> Sent from my iPhone
>>
>>> On 9 Mar 2016, at 21:58, Tristan Nixon  wrote:
>>>
>>> That’s very strange. I just un-set my SPARK_HOME env param, downloaded a 
>>> fresh 1.6.0 tarball,
>>> unzipped it to local dir (~/Downloads), and it ran just fine - the driver 
>>> port is some randomly generated large number.
>>> So SPARK_HOME is definitely not needed to run this.
>>>
>>> Aida, you are not running this as the super-user, are you?  What versions 
>>> of Java & Scala do you have installed?
>>>
 On Mar 9, 2016, at 3:53 PM, Aida Tefera  wrote:

 Hi Jakob,

 Tried running the command env|grep SPARK; nothing comes back

 Tried env|grep Spark; which is the directory I created for Spark once I 
 downloaded the tgz file; comes back with PWD=/Users/aidatefera/Spark

 Tried running ./bin/spark-shell ; comes back with same error as below; i.e 
 could not bind to port 0 etc.

 Sent from my iPhone

> On 9 Mar 2016, at 21:42, Jakob Odersky  wrote:
>
> As Tristan mentioned, it looks as though Spark is trying to bind on
> port 0 and then 1 (which is not allowed). Could it be that some
> environment variables from you previous installation attempts are
> polluting your configuration?
> What does running "env | grep SPARK" show you?
>
> Also, try running just "/bin/spark-shell" (without the --master
> argument), maybe your shell is doing some funky stuff with the
> brackets.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

On Thu, Mar 10, 2016 at 12:24 PM, Tristan Nixon  wrote:
> If you type ‘whoami’ in the terminal, and it responds with ‘root’ then you’re 
> the superuser.
> However, as mentioned below, I don’t think its a relevant factor.
>
>> On Mar 10, 2016, at 12:02 PM, Aida Tefera  wrote:
>>
>> Hi Tristan,
>>
>> I'm afraid I wouldn't know whether I'm running it as super user.
>>
>> I have java version 1.8.0_73 and SCALA version 2.11.7
>>
>> Sent from my iPhone
>>
>>> On 9 Mar 2016, at 21:58, Tristan Nixon  wrote:
>>>
>>> That’s very strange. I just un-set my SPARK_HOME env param, downloaded a 
>>> fresh 1.6.0 tarball,
>>> unzipped it to local dir (~/Downloads), and it ran just fine - the driver 
>>> port is some randomly generated large number.
>>> So SPARK_HOME is definitely not needed to run this.
>>>
>>> Aida, you are not running this as the super-user, are you?  What versions 
>>> of Java & Scala do you have installed?
>>>
 On Mar 9, 2016, at 3:53 PM, Aida Tefera  wrote:

 Hi Jakob,

 Tried running the command env|grep SPARK; nothing comes back

 Tried env|grep Spark; which is the directory I created for Spark once I 
 downloaded the tgz file; comes back with PWD=/Users/aidatefera/Spark

 Tried running ./bin/spark-shell ; comes back with same error as below; i.e 
 could not bind to port 0 etc.

 Sent from my iPhone

> On 9 Mar 2016, at 21:42, Jakob Odersky  wrote:
>
> As Tristan mentioned, it looks as though Spark is trying to bind on
> port 0 and then 1 (which is not allowed). Could it be that some
> environment variables from you previous installation attempts are
> polluting your configuration?
> What does running "env | grep SPARK" show you?
>
> Also, try running just "/bin/spark-shell" (without the --master
> argument), maybe your shell is doing some funky stuff with the
> brackets.

 --

Re: Zeppelin Integration

2016-03-11 Thread Mich Talebzadeh
BTW, when the daemon is stopped on the host, the notebook just hangs if it
was running, without  any errors. The only way is to tail the last log in
$ZEPPELIN_HOME/logs. So I would say a cron type job is required to scan the
log for errors.

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 10 March 2016 at 11:32, ayan guha  wrote:

> Thanks guys for reply. Yes, Zeppelin with Spark is pretty compelling
> choice, for single user. Any pointers for using Zeppelin for multi user
> scenario? In essence, can we either (a) Use Zeppelin to connect to a long
> running Spark Application which has some pre-cached Dataframes? (b) Can
> Zeppelin user be passed down and use Ranger to implement Hive RBAC?
>
> I know I am sounding a little vague, but such is the problem state in my
> mind :) Any help will be appreciated.
>
> Best
> Ayan
>
> On Thu, Mar 10, 2016 at 9:51 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Zeppelin is pretty a good choice for Spark. It has a UI that allows you
>> to run your code. It has Interpreter where you change the connection
>> configuration. I made mine run on port 21999 (a deamon process on Linux
>> host where your spark master is running). It is pretty easy to set up and
>> run.
>>
>> HTH
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 10 March 2016 at 10:26, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>>> I believe you need to co-locate your Zeppelin on the same node where
>>> Spark is installed. You need to specify the SPARK HOME. The master I used
>>> was YARN.
>>>
>>> Zeppelin exposes a notebook interface. A notebook can have many
>>> paragraphs. You run the paragraphs. You can mix multiple contexts in the
>>> same notebook. So first paragraph can be scala, second can be sql that uses
>>> DF from first paragraph etc. If you use a select query, the output is
>>> automatically displayed as a chart.
>>>
>>> As RDDs are bound to the context that creates them, I don't think
>>> Zeppelin can use those RDDs.
>>>
>>> I don't know if notebooks can be reused within other notebooks. It would
>>> be a nice way of doing some common preparatory work (like building these
>>> RDDs).
>>>
>>> Regards
>>> Sab
>>>
>>> On Thu, Mar 10, 2016 at 2:28 PM, ayan guha  wrote:
>>>
 Hi All

 I am writing this in order to get a fair understanding of how zeppelin
 can be integrated with Spark.

 Our use case is to load few tables from a DB to Spark, run some
 transformation. Once done, we want to expose data through Zeppelin for
 analytics. I have few question around that to sound off any gross
 architectural flaws.

 Questions:

 1. How Zeppelin connects to Spark? Thriftserver? Thrift JDBC?

 2. What is the scope of Spark application when it is used from
 Zeppelin? For example, if I have few subsequent actions in zeppelin like
 map,filter,reduceByKey, filter,collect. I assume this will translate to an
 application and get submitted to Spark. However, If I want to use reuse
 some part of the data (for example) after first map transformation in
 earlier application. Can I do it? Or will it be another application and
 another spark submit?

  In our use case data will already be loaded in RDDs. So how Zeppelin
 can access it?

 3. How can I control access on specific rdds to specific users in
 Zeppelin (assuming we have implemented some way of login mechanism in
 Zeppelin and we have a mapping between Zeppelin users and their LDAP
 accounts). Is it even possible?

 4. If Zeppelin is not a good choice, yet, for the use case, what are
 the other alternatives?

 appreciate any help/pointers/guidance.


 --
 Best Regards,
 Ayan Guha

>>>
>>>
>>>
>>> --
>>>
>>> Architect - Big Data
>>> Ph: +91 99805 99458
>>>
>>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>>> Sullivan India ICT)*
>>> +++
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Installing Spark on Mac

2016-03-11 Thread Jakob Odersky
regarding my previous message, I forgot to mention to run netstat as
root (sudo netstat -plunt)
sorry for the noise

On Fri, Mar 11, 2016 at 12:29 AM, Jakob Odersky  wrote:
> Some more diagnostics/suggestions:
>
> 1) are other services listening to ports in the 4000 range (run
> "netstat -plunt")? Maybe there is an issue with the error message
> itself.
>
> 2) are you sure the correct java version is used? java -version
>
> 3) can you revert all installation attempts you have done so far,
> including files installed by brew/macports or maven and try again?
>
> 4) are there any special firewall rules in place, forbidding
> connections on localhost?
>
> This is very weird behavior you're seeing. Spark is supposed to work
> out-of-the-box with ZERO configuration necessary for running a local
> shell. Again, my prime suspect is a previous, failed Spark
> installation messing up your config.
>
> On Thu, Mar 10, 2016 at 12:24 PM, Tristan Nixon  wrote:
>> If you type ‘whoami’ in the terminal, and it responds with ‘root’ then 
>> you’re the superuser.
>> However, as mentioned below, I don’t think its a relevant factor.
>>
>>> On Mar 10, 2016, at 12:02 PM, Aida Tefera  wrote:
>>>
>>> Hi Tristan,
>>>
>>> I'm afraid I wouldn't know whether I'm running it as super user.
>>>
>>> I have java version 1.8.0_73 and SCALA version 2.11.7
>>>
>>> Sent from my iPhone
>>>
 On 9 Mar 2016, at 21:58, Tristan Nixon  wrote:

 That’s very strange. I just un-set my SPARK_HOME env param, downloaded a 
 fresh 1.6.0 tarball,
 unzipped it to local dir (~/Downloads), and it ran just fine - the driver 
 port is some randomly generated large number.
 So SPARK_HOME is definitely not needed to run this.

 Aida, you are not running this as the super-user, are you?  What versions 
 of Java & Scala do you have installed?

> On Mar 9, 2016, at 3:53 PM, Aida Tefera  wrote:
>
> Hi Jakob,
>
> Tried running the command env|grep SPARK; nothing comes back
>
> Tried env|grep Spark; which is the directory I created for Spark once I 
> downloaded the tgz file; comes back with PWD=/Users/aidatefera/Spark
>
> Tried running ./bin/spark-shell ; comes back with same error as below; 
> i.e could not bind to port 0 etc.
>
> Sent from my iPhone
>
>> On 9 Mar 2016, at 21:42, Jakob Odersky  wrote:
>>
>> As Tristan mentioned, it looks as though Spark is trying to bind on
>> port 0 and then 1 (which is not allowed). Could it be that some
>> environment variables from you previous installation attempts are
>> polluting your configuration?
>> What does running "env | grep SPARK" show you?
>>
>> Also, try running just "/bin/spark-shell" (without the --master
>> argument), maybe your shell is doing some funky stuff with the
>> brackets.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org

>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>
> On Thu, Mar 10, 2016 at 12:24 PM, Tristan Nixon  wrote:
>> If you type ‘whoami’ in the terminal, and it responds with ‘root’ then 
>> you’re the superuser.
>> However, as mentioned below, I don’t think its a relevant factor.
>>
>>> On Mar 10, 2016, at 12:02 PM, Aida Tefera  wrote:
>>>
>>> Hi Tristan,
>>>
>>> I'm afraid I wouldn't know whether I'm running it as super user.
>>>
>>> I have java version 1.8.0_73 and SCALA version 2.11.7
>>>
>>> Sent from my iPhone
>>>
 On 9 Mar 2016, at 21:58, Tristan Nixon  wrote:

 That’s very strange. I just un-set my SPARK_HOME env param, downloaded a 
 fresh 1.6.0 tarball,
 unzipped it to local dir (~/Downloads), and it ran just fine - the driver 
 port is some randomly generated large number.
 So SPARK_HOME is definitely not needed to run this.

 Aida, you are not running this as the super-user, are you?  What versions 
 of Java & Scala do you have installed?

> On Mar 9, 2016, at 3:53 PM, Aida Tefera  wrote:
>
> Hi Jakob,
>
> Tried running the command env|grep SPARK; nothing comes back
>
> Tried env|grep Spark; which is the directory I created for Spark once I 
> downloaded the tgz file; comes back with PWD=/Users/aidatefera/Spark
>
> Tried running ./bin/spark-shell ; comes back with same error as below; 
> i.e could not bind to port 0 etc.
>
> Sent from my iPhone
>
>> On 9 Mar 2016, at 21:42, Jakob Odersky  wrote:
>>
>> As Tristan mentioned, it looks as though Spark is trying to bind on
>> port 0 and then 1 (which is not allowed). Could it be that some
>> environment variables from you previous installation attempts are

Re: Running ALS on comparitively large RDD

2016-03-11 Thread Nick Pentreath
Hmmm, something else is going on there. What data source are you reading
from? How much driver and executor memory have you provided to Spark?



On Fri, 11 Mar 2016 at 09:21 Deepak Gopalakrishnan  wrote:

> 1. I'm using about 1 million users against few thousand products. I
> basically have around a million ratings
> 2. Spark 1.6 on Amazon EMR
>
> On Fri, Mar 11, 2016 at 12:46 PM, Nick Pentreath  > wrote:
>
>> Could you provide more details about:
>> 1. Data set size (# ratings, # users and # products)
>> 2. Spark cluster set up and version
>>
>> Thanks
>>
>> On Fri, 11 Mar 2016 at 05:53 Deepak Gopalakrishnan 
>> wrote:
>>
>>> Hello All,
>>>
>>> I've been running Spark's ALS on a dataset of users and rated items. I
>>> first encode my users to integers by using an auto increment function (
>>> just like zipWithIndex), I do the same for my items. I then create an RDD
>>> of the ratings and feed it to ALS.
>>>
>>> My issue is that the ALS algorithm never completes. Attached is a
>>> screenshot of the stages window.
>>>
>>> Any help will be greatly appreciated
>>>
>>> --
>>> Regards,
>>> *Deepak Gopalakrishnan*
>>> *Mobile*:+918891509774
>>> *Skype* : deepakgk87
>>> http://myexps.blogspot.com
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> Regards,
> *Deepak Gopalakrishnan*
> *Mobile*:+918891509774
> *Skype* : deepakgk87
> http://myexps.blogspot.com
>
>


can checkpoint and write ahead log save the data in queued batch?

2016-03-11 Thread Yu Xie
Hi spark user

  I am running an spark streaming app that use receiver from a pubsub
system, and the pubsub system does NOT support ack.

  And I don't want the data to be lost if there is a driver failure, and by
accident, the batches queue up at that time.

  I tested by generating some queued batches with some input (see the pic),
and then quit the application.
  When I restart the application again, I saw there are no input for these
batches.

  Is it as expected?


Before restart
[image: Inline image 1]


How to efficiently query a large table with multiple dimensional table?

2016-03-11 Thread ashokkumar rajendran
Hi All,

I have a large table with few billions of rows and have a very small table
with 4 dimensional values. I would like to get rows that match any of these
dimensions. For example,

Select field1, field2 from A, B where A.dimension1 = B.dimension1 OR
A.dimension2 = B.dimension2 OR A.dimension3 = B.dimension3 OR A.dimension4
= B.dimension4.

The query plan takes this as nestedLoop and executes for very long time.

If I execute this as Union queries, it takes around 1.5mins for each
dimension.

Select field1, field2 from A, B where A.dimension1 = B.dimension1
UNION ALL
Select field1, field2 from A, B where A.dimension2 = B.dimension2
UNION ALL
Select field1, field2 from A, B where  A.dimension3 = B.dimension3
UNION ALL
Select field1, field2 from A, B where  A.dimension4 = B.dimension4.

This is obviously not an optimal solution as it makes multiple scanning at
same table. Is there any other optimal solution for it?

I tried to do this with plain Spark (without SQL) using broadcast map join
but the performance was bad than this.


Regards
Ashok


Re: Can we use spark inside a web service?

2016-03-11 Thread Hemant Bhanawat
Spark-jobserver is an elegant product that builds concurrency on top of
Spark. But, the current design of DAGScheduler prevents Spark to become a
truly concurrent solution for low latency queries. DagScheduler will turn
out to be a bottleneck for low latency queries. Sparrow project was an
effort to make Spark more suitable for such scenarios but it never made it
to the Spark codebase. If Spark has to become a highly concurrent solution,
scheduling has to be distributed.

Hemant Bhanawat 
www.snappydata.io

On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly  wrote:

> great discussion, indeed.
>
> Mark Hamstra and i spoke offline just now.
>
> Below is a quick recap of our discussion on how they've achieved
> acceptable performance from Spark on the user request/response path (@mark-
> feel free to correct/comment).
>
> 1) there is a big difference in request/response latency between
> submitting a full Spark Application (heavy weight) versus having a
> long-running Spark Application (like Spark Job Server) that submits
> lighter-weight Jobs using a shared SparkContext.  mark is obviously using
> the latter - a long-running Spark App.
>
> 2) there are some enhancements to Spark that are required to achieve
> acceptable user request/response times.  some links that Mark provided are
> as follows:
>
>- https://issues.apache.org/jira/browse/SPARK-11838
>- https://github.com/apache/spark/pull/11036
>- https://github.com/apache/spark/pull/11403
>- https://issues.apache.org/jira/browse/SPARK-13523
>- https://issues.apache.org/jira/browse/SPARK-13756
>
> Essentially, a deeper level of caching at the shuffle file layer to reduce
> compute and memory between queries.
>
> Note that Mark is running a slightly-modified version of stock Spark.
>  (He's mentioned this in prior posts, as well.)
>
> And I have to say that I'm, personally, seeing more and more
> slightly-modified versions of Spark being deployed to production to
> workaround outstanding PR's and Jiras.
>
> this may not be what people want to hear, but it's a trend that i'm seeing
> lately as more and more customize Spark to their specific use cases.
>
> Anyway, thanks for the good discussion, everyone!  This is why we have
> these lists, right!  :)
>
>
> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan 
> wrote:
>
>> One of the premises here is that if you can restrict your workload to
>> fewer cores - which is easier with FiloDB and careful data modeling -
>> you can make this work for much higher concurrency and lower latency
>> than most typical Spark use cases.
>>
>> The reason why it typically does not work in production is that most
>> people are using HDFS and files.  These data sources are designed for
>> running queries and workloads on all your cores across many workers,
>> and not for filtering your workload down to only one or two cores.
>>
>> There is actually nothing inherent in Spark that prevents people from
>> using it as an app server.   However, the insistence on using it with
>> HDFS is what kills concurrency.   This is why FiloDB is important.
>>
>> I agree there are more optimized stacks for running app servers, but
>> the choices that you mentioned:  ES is targeted at text search;  Cass
>> and HBase by themselves are not fast enough for analytical queries
>> that the OP wants;  and MySQL is great but not scalable.   Probably
>> something like VectorWise, HANA, Vertica would work well, but those
>> are mostly not free solutions.   Druid could work too if the use case
>> is right.
>>
>> Anyways, great discussion!
>>
>> On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly  wrote:
>> > you are correct, mark.  i misspoke.  apologies for the confusion.
>> >
>> > so the problem is even worse given that a typical job requires multiple
>> > tasks/cores.
>> >
>> > i have yet to see this particular architecture work in production.  i
>> would
>> > love for someone to prove otherwise.
>> >
>> > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra 
>> > wrote:
>> >>>
>> >>> For example, if you're looking to scale out to 1000 concurrent
>> requests,
>> >>> this is 1000 concurrent Spark jobs.  This would require a cluster
>> with 1000
>> >>> cores.
>> >>
>> >>
>> >> This doesn't make sense.  A Spark Job is a driver/DAGScheduler concept
>> >> without any 1:1 correspondence between Worker cores and Jobs.  Cores
>> are
>> >> used to run Tasks, not Jobs.  So, yes, a 1000 core cluster can run at
>> most
>> >> 1000 simultaneous Tasks, but that doesn't really tell you anything
>> about how
>> >> many Jobs are or can be concurrently tracked by the DAGScheduler,
>> which will
>> >> be apportioning the Tasks from those concurrent Jobs across the
>> available
>> >> Executor cores.
>> >>
>> >> On Thu, Mar 10, 2016 at 2:00 PM, Chris Fregly 
>> wrote:
>> >>>
>> >>> Good stuff, Evan.  Looks like this is utilizing the in-memory
>> >>> capabilities of FiloDB which is pretty cool.  looking forward to the
>> webcast
>> 

Re: Running ALS on comparitively large RDD

2016-03-11 Thread Deepak Gopalakrishnan
Executor memory : 45g X 4 executors , 1 Driver with 45g memory
Data Source is from S3 and I've logs that tells me the Rating objects are
loaded fine.

On Fri, Mar 11, 2016 at 2:13 PM, Nick Pentreath 
wrote:

> Hmmm, something else is going on there. What data source are you reading
> from? How much driver and executor memory have you provided to Spark?
>
>
>
> On Fri, 11 Mar 2016 at 09:21 Deepak Gopalakrishnan 
> wrote:
>
>> 1. I'm using about 1 million users against few thousand products. I
>> basically have around a million ratings
>> 2. Spark 1.6 on Amazon EMR
>>
>> On Fri, Mar 11, 2016 at 12:46 PM, Nick Pentreath <
>> nick.pentre...@gmail.com> wrote:
>>
>>> Could you provide more details about:
>>> 1. Data set size (# ratings, # users and # products)
>>> 2. Spark cluster set up and version
>>>
>>> Thanks
>>>
>>> On Fri, 11 Mar 2016 at 05:53 Deepak Gopalakrishnan 
>>> wrote:
>>>
 Hello All,

 I've been running Spark's ALS on a dataset of users and rated items. I
 first encode my users to integers by using an auto increment function (
 just like zipWithIndex), I do the same for my items. I then create an RDD
 of the ratings and feed it to ALS.

 My issue is that the ALS algorithm never completes. Attached is a
 screenshot of the stages window.

 Any help will be greatly appreciated

 --
 Regards,
 *Deepak Gopalakrishnan*
 *Mobile*:+918891509774
 *Skype* : deepakgk87
 http://myexps.blogspot.com


 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>> --
>> Regards,
>> *Deepak Gopalakrishnan*
>> *Mobile*:+918891509774
>> *Skype* : deepakgk87
>> http://myexps.blogspot.com
>>
>>


-- 
Regards,
*Deepak Gopalakrishnan*
*Mobile*:+918891509774
*Skype* : deepakgk87
http://myexps.blogspot.com


kill Spark Streaming job gracefully

2016-03-11 Thread Shams ul Haque
Hi,

I want to kill a Spark Streaming job gracefully, so that whatever Spark has
picked from Kafka have processed. My Spark version is: 1.6.0

When i tried killing a Spark Streaming Job from Spark UI dosen't stop app
completely. In Spark-UI job is moved to COMPLETED section, but in log it
continuously gives error: http://pastebin.com/TbGrdzA2
and process is still visible with *ps* command.


I also tried to stop by using below command:
*bin/spark-submit --master spark://shams-cashcare:7077 --kill
app-20160311121141-0002*
but it gives me error as:
Unable to connect to server spark://shams-cashcare:7077

I have confirmed the Spark master host:port and they are OK. I also added
ShutdownHook in code.
What am i missing? Or if i am doing something wrong then please guide me.


Strange behavior of collectNeighbors API in GraphX

2016-03-11 Thread Zhaokang Wang
  
  
Hi all,
  
These days I havemet a problem of GraphX鈥檚 strange behavior on
collectNeighborsAPI. It seems that this API has side-effects on the
Pregel API.It makes Pregel API not work as expected. The following
is asmall code demo to reproduce this strange behavior. You can get 
  
the whole source code in the attachment.
  
The steps toreproduce the side-effects:
  

Create a little toy graph with a  simple vertex property type:  
 class VertexProperty(val inNeighbor:ArrayBuffer[Long] =
ArrayBuffer[Long]()) extends Serializable { } // Create a data graph.
Vertices:1,2,3; Edges:2 -> 1, 3 -> 1, 2 -> 3... val purGraph =
Graph(dataVertex, dataEdge).persist()

  
Call collectNeighborsmethod to get both inNeighbor graph and
outNeighbor graph ofthe purGraph.Then outer join the
inNeighborGraphwith purGraphto get the dataGraph:
  
// Get inNeighbor and outNeighbor graph from purGraph val inNeighborGraph =
purGraph.collectNeighbors(EdgeDirection.In) // !!May be BUG here!! If we
don't collect outneighbors here, the bug will disappear. val
outNeighborGraph = purGraph.collectNeighbors(EdgeDirection.Out) // Now join
the in neighbor vertex id list to every vertex's property val dataGraph =
purGraph.outerJoinVertices(inNeighborGraph)((vid, property, inNeighborList)
=> {   val inNeighborVertexIds =
inNeighborList.getOrElse(Array[(VertexId, VertexProperty)]()).map(t =>
t._1)   property.inNeighbor ++= inNeighborVertexIds.toBuffer   property })

  
Conduct a simple Pregel computation on dataGraph.In the Pregel
vertex program phase, we do nothing but justprint some debug
info. However, in the send message phase,we find that the
inNeighborproperty of vertex 1 has changed! The inNeighbor  
   
property values of vertex 1 are inconsistent between the  vertex
program phase and the send message phase!
  
val result = dataGraph.pregel(Array[Long](), maxIterations = 1,
EdgeDirection.Both)(  // vertex program  (id, property, msg) =>
vertexProgram(id, property, msg),  // send messages  triplet =>
sendMessage(triplet),  // combine messages  (a, b) => messageCombiner(a,
b))// In the vertex program, we change nothing...def vertexProgram(id:
VertexId, property: VertexProperty, msgSum: Array[Long]):VertexProperty = { 
if(id == 1L) println("[Vertex Program]Vertex 1's inNeighbor property
length is:" + property.inNeighbor.length)  property}// In the send message
phase, we just check the vertex property of the same vertex. // We should
get the same results in the two phases.def
sendMessage(edge:EdgeTriplet[VertexProperty, Null]):Iterator[(VertexId,
Array[Long])]={// Print vertex 1's inNeighbor lengthif(edge.srcId == 1L) 
println("[Send Message] Vertex 1's inNeighbor property length is:" +
edge.srcAttr.inNeighbor.length)if(edge.dstId == 1L)  println("[Send Message]
Vertex 1's inNeighbor property length is:" +
edge.dstAttr.inNeighbor.length)val sIn = edge.srcAttr.inNeighborval dIn =
edge.dstAttr.inNeighbor//send empty
messageArrayBuffer[(VertexId,Array[Long])]().toIterator}def
messageCombiner(a:Array[Long], b:Array[Long]):Array[Long]={
Array.concat(a,b) }
  
In the program output, we get:
  
[Vertex Program]Vertex 1's inNeighbor property length is:2[Send Message]
Vertex 1's inNeighbor property length is:0[Send Message] Vertex 1's
inNeighbor property length is:0

  
More weirdly, if we comment out one of the collectNeighbors   
method call, everything will be OK! As you may notice,actually
we do not use outNeighborGraphin our program, so we can comment
the following statement inthe program:
  
//  val outNeighborGraph = purGraph.collectNeighbors(EdgeDirection.Out)
  
 If we comment that statement out, you canfind that everything
is Okay now.
  
[Vertex Program]Vertex 1's inNeighbor property length is:2[Send Message]
Vertex 1's inNeighbor property length is:2[Send Message] Vertex 1's
inNeighbor property length is:2
  
  
The behavior of collectNeighborsis strange. Maybe it is a bug of
GraphX or I call this APIimproperly or my vertex property is
improper. Could you pleasegive some comments on this? Thanks a lot.
  
Regards,
  
Zhaokang Wang
  
PS. We have testedthe code on Spark 1.5.1 and 1.6.0.
  
鈥�

The source code mentioned in the post: Main2.scala
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behavior-of-collectNeighbors-API-in-GraphX-tp26459.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Nick Pentreath
Would you mind letting us know the # training examples in the datasets?
Also, what do your features look like? Are they text, categorical etc? You
mention that most rows only have a few features, and all rows together have
a few 10,000s features, yet your max feature value is 20 million. How are
your constructing your feature vectors to get a 20 million size? The only
realistic way I can see this situation occurring in practice is with
feature hashing (HashingTF).

MultivariateOnlineSummarizer uses dense arrays, but it should be possible
to enable sparse data. Though in theory, the result will tend to be dense
anyway, unless you have very many entries in the input feature vector that
never occur and are actually zero throughout the data set (which it seems
is the case with your data?). So I doubt whether using sparse vectors for
the summarizer would improve performance in general.

LR doesn't accept a sparse weight vector, as it uses dense vectors for
coefficients and gradients currently. When using L1 regularization, it
could support sparse weight vectors, but the current implementation doesn't
do that yet.

On Thu, 10 Mar 2016 at 23:45 Daniel Siegmann 
wrote:

> Hi Nick,
>
> Thanks for the feedback and the pointers. I tried coalescing to fewer
> partitions and improved the situation dramatically. As you suggested, it is
> communication overhead dominating the overall runtime.
>
> The training run I mentioned originally had 900 partitions. Each tree
> aggregation has two stages, one for the original partitions, and then one
> with the aggregation into a smaller number (at 900 partitions the second
> stage was 30). The first tree aggregation job (the longer one) uses the
> MultivariateOnlineSummarizer you mentioned, while the subsequent
> aggregation jobs use LogisticAggregator (similar problem, though smaller).
>
> I've run some tests with fewer partitions on a very similar data set. 400
> partitions took 8 hours, 100 partitions took 4 hours, and 10 partitions
> took 1.4 hours. I put some screenshots from the Spark UI here:
> http://imgur.com/a/trRJU
>
> Still, these numbers seem oddly high. With 10 partitions it's shuffling
> only some 200 MB per job, but the median "Getting Result Time" is 2.1
> minutes. I would expected it to take *seconds* to transfer that data.
>
> Anyway, the MultivariateOnlineSummarizer creates several arrays of
> doubles equal to the size of the vector - arrays of course are inherently
> dense. While this is only one iteration it is the longest, taking a
> significant portion of the time by itself. LogisticAggregator meanwhile
> has fewer arrays, but if you try to pass coefficients as anything other
> than a dense vector it actually throws an error! Any idea why? Anyone know
> a reason these aggregators *must* store their data densely, or is just an
> implementation choice? Perhaps refactoring these classes to store data
> sparsely would fix the issue.
>
> On Wed, Mar 9, 2016 at 7:57 AM, Nick Pentreath 
> wrote:
>
>> Hi Daniel
>>
>> The bottleneck in Spark ML is most likely (a) the fact that the weight
>> vector itself is dense, and (b) the related communication via the driver. A
>> tree aggregation mechanism is used for computing gradient sums (see
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala#L1080
>>  and
>> https://databricks.com/blog/2014/09/22/spark-1-1-mllib-performance-improvements.html),
>> which helps efficiency, but ultimately the driver must collect that
>> gradient vector and re-broadcast the updated weight vector on every
>> iteration.
>>
>> From a quick glance, MultivariateOnlineSummarizer doesn't seem optimized
>> to sparse data either (though that is only one pass of the dataset so
>> doubtful it adds too much overhead).
>>
>> It would be helpful to understand some further details:
>> 1. Some more exact timing numbers - if you could provide screenshots /
>> output from the UI to indicate the stages and call sites where the time is
>> being spent that would be really useful
>> 2. Is this part of a pipeline, and if so, what is the contribution of
>> other parts of that pipeline to overall runtime?
>> 3. Some stats on input / output data sizes from the critical stages
>> (again from the UI)
>> 4. The dataset size (# examples, avg sparsity % per example, etc)
>> 5. Related to (4), the number of partitions of your dataset
>> 6. Cluster details (# nodes and spec), as well as Spark version
>>
>> If you have a lot of partitions, you could find performance will be
>> better with fewer partitions because the communication overhead will tend
>> to dominate the overall runtime.
>>
>> Still, 10 hours and >100GB of driver memory seems extreme for a 20
>> million size dense weight vector (which should only be a few 100MB memory),
>> so perhaps something else is going on.
>>
>> Nick
>>
>> On Tue, 8 Mar 2016 at 22:55 Daniel Siegmann 
>> wrote:
>>
>>> Just for the heck of it I tried the old MLlib impl

Re: Spark configuration with 5 nodes

2016-03-11 Thread Steve Loughran

On 10 Mar 2016, at 22:15, Ashok Kumar 
mailto:ashok34...@yahoo.com.invalid>> wrote:


Hi,

We intend  to use 5 servers which will be utilized for building Bigdata Hadoop 
data warehouse system (not using any propriety distribution like Hortonworks or 
Cloudera or others).

I'd argue that life is if simpler with either of these, or bigtop+ambari built 
up yourself, for the management and monitoring tools more than anything else. 
Life is simpler if there's a web page of cluster status. But: DIY teaches you 
the internals of how things work, which is good for getting your hands dirty 
later on. Just start to automate things from the outset, keep configs under 
SCM, etc. And decide whether or not you want to go with Kerberos (==secure 
HDFS) from the outset. If you don't, put your cluster on a separate isolated 
subnet. You ought to have the boxes on a separate switch anyway if you can, 
just to avoid network traffic hurting anyone else on the switch.

All servers configurations are 512GB RAM, 30TB storage and 16 cores, Ubuntu 
Linux servers. Hadoop will be installed on all the servers/nodes. Server 1 will 
be used for NameNode plus DataNode as well. Server 2 will be  used for standby 
NameNode & DataNode. The rest of the servers will be used as DataNodes..


1. Make sure you've got the HDFS/NN space allocation on the two NNs set up so 
that HDFS blocks don't get into the way of the NN's needs (which ideally should 
be on a separate disk with RAID turned on);
2. Same for the worker nodes; temp space matters
3. On a small cluster, the cost of a DN failure is more significant: a bigger 
fraction of the data will go offline, recovery bandwidth limited to the 4 
remaining boxes, etc, etc. Just be aware of that: in a bigger cluster, a single 
server is usually less traumatic. Though HDFS-599 shows that even facebook can 
lose a cluster or two.

Now we would like to install Spark on each servers to create Spark cluster. Is 
that the good thing to do or we should buy additional hardware for Spark 
(minding cost here) or simply do we require additional memory to accommodate 
Spark as well please. In that case how much memory for each Spark node would 
you recommend?


You should be running your compute work on the same systems as the data, as its 
the "hadoop cluster way"; locality of data ==> performance. If you were to buy 
more hardware, go for more store+compute, rather than just compute.

Spark likes RAM for sharing results; less RAM == more problems. but: you can 
buy extra RAM if you need it, provided you've got space in the servers to put 
it in. Same for storage.

Do make sure that you have ECC memory; there are some papers from google and 
microsoft on that topic if you want links to the details. Without ECC your data 
will be corrupted *and you won't even know*

-Steve




ALS update without re-computing everything

2016-03-11 Thread Roberto Pagliari
In the current implementation of ALS with implicit feedback, when new date come 
in, it is not possible to update user/product matrices without re-computing 
everything.

Is this feature in planning or any known work around?

Thank you,



unsubscribe

2016-03-11 Thread ????/??????


Re: ALS update without re-computing everything

2016-03-11 Thread Nick Pentreath
Currently this is not supported. If you want to do incremental fold-in of
new data you would need to do it outside of Spark (e.g. see this
discussion:
https://mail-archives.apache.org/mod_mbox/spark-user/201603.mbox/browser,
which also mentions a streaming on-line MF implementation with SGD).

In general, for serving situations MF models are stored in some other
serving system, so that system may be better suited to do the actual
fold-in. Sean's Oryx project does that, though I'm not sure offhand if that
part is done in Spark or not.

I know Sean's old Myrrix project also used to support computing ALS with an
initial set of input factors, so you could in theory incrementally compute
on new data. I'm not sure if the newer Oryx project supports it though.

@Sean, what are your thoughts on supporting an initial model (factors) in
ALS? I personally have always just recomputed the model, but for very large
scale stuff it can make a lot of sense obviously. What I'm not sure on is
whether it gives good solutions (relative to recomputing) - I'd imagine it
will tend to find a slightly better local minimum given a previous local
minimum starting point... with the advantage that new users / items are
incorporated. But of course users can do a full recompute periodically.


On Fri, 11 Mar 2016 at 13:04 Roberto Pagliari 
wrote:

> In the current implementation of ALS with implicit feedback, when new date
> come in, it is not possible to update user/product matrices without
> re-computing everything.
>
> Is this feature in planning or any known work around?
>
> Thank you,
>
>


Re: ALS update without re-computing everything

2016-03-11 Thread Sean Owen
On Fri, Mar 11, 2016 at 12:18 PM, Nick Pentreath
 wrote:
> In general, for serving situations MF models are stored in some other
> serving system, so that system may be better suited to do the actual
> fold-in. Sean's Oryx project does that, though I'm not sure offhand if that
> part is done in Spark or not.

(No this part isn't Spark; it's just manipulating arrays in memory.
Making the model is done in Spark, as is marshalling the input from a
Kafka topic.)


> I know Sean's old Myrrix project also used to support computing ALS with an
> initial set of input factors, so you could in theory incrementally compute
> on new data. I'm not sure if the newer Oryx project supports it though.

(Yes, exactly the same thing exists in oryx)


> @Sean, what are your thoughts on supporting an initial model (factors) in
> ALS? I personally have always just recomputed the model, but for very large
> scale stuff it can make a lot of sense obviously. What I'm not sure on is
> whether it gives good solutions (relative to recomputing) - I'd imagine it
> will tend to find a slightly better local minimum given a previous local
> minimum starting point... with the advantage that new users / items are
> incorporated. But of course users can do a full recompute periodically.

I'd prefer to be able to specify a model, since typically the initial
model takes 20-40 iterations to converge to a reasonable state, and
only needs a few more to converge to the same threshold given a
relatively small number of additional inputs. The difference can be a
lot of compute time.

This is one of the few things that got worse when I moved to Spark
since this capability was lost.

I had been too lazy to actually implement it though. But that'd be cool.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: ALS update without re-computing everything

2016-03-11 Thread Nick Pentreath
There is a general movement to allowing initial models to be specified for
Spark ML algorithms, so I'll add a JIRA to that task set. I should be able
to work on this as well as other ALS improvements.

Oh, another reason fold-in is typically not done in Spark is that for
models of any reasonable size, it is not really possible (or very
inefficient) to update a row (or a few rows) of a DF easily, so it's better
done in the serving layer, either in memory and/or using some database
(often a NoSQL store of some kind). Though I've often thought about doing
this in a Streaming job, and with the new state management it could work
much better.

On Fri, 11 Mar 2016 at 14:21 Sean Owen  wrote:

> On Fri, Mar 11, 2016 at 12:18 PM, Nick Pentreath
>  wrote:
> > In general, for serving situations MF models are stored in some other
> > serving system, so that system may be better suited to do the actual
> > fold-in. Sean's Oryx project does that, though I'm not sure offhand if
> that
> > part is done in Spark or not.
>
> (No this part isn't Spark; it's just manipulating arrays in memory.
> Making the model is done in Spark, as is marshalling the input from a
> Kafka topic.)
>
>
> > I know Sean's old Myrrix project also used to support computing ALS with
> an
> > initial set of input factors, so you could in theory incrementally
> compute
> > on new data. I'm not sure if the newer Oryx project supports it though.
>
> (Yes, exactly the same thing exists in oryx)
>
>
> > @Sean, what are your thoughts on supporting an initial model (factors) in
> > ALS? I personally have always just recomputed the model, but for very
> large
> > scale stuff it can make a lot of sense obviously. What I'm not sure on is
> > whether it gives good solutions (relative to recomputing) - I'd imagine
> it
> > will tend to find a slightly better local minimum given a previous local
> > minimum starting point... with the advantage that new users / items are
> > incorporated. But of course users can do a full recompute periodically.
>
> I'd prefer to be able to specify a model, since typically the initial
> model takes 20-40 iterations to converge to a reasonable state, and
> only needs a few more to converge to the same threshold given a
> relatively small number of additional inputs. The difference can be a
> lot of compute time.
>
> This is one of the few things that got worse when I moved to Spark
> since this capability was lost.
>
> I had been too lazy to actually implement it though. But that'd be cool.
>


BlockFetchFailed Exception

2016-03-11 Thread Priya Ch
Hi All,

  I am trying to run spark k-means on a data set which is closely to 1 GB.
Most often I seen BlockFetchFailed Exception which I am suspecting because
of Out of memory.

Here the configuration details-
Total cores:12
Total workers:3
Memory per node: 6GB

When running the job, I an giving the following parameters as
 --num-executors 3 --executor-cores 3 --executor-memory 5G

As the hdfs file has 9 blocks, there would be 9 partitions i.e 9 tasks (one
task need 1 core each) hence total cores allocated across all the executors
is 9.

Here the details of the exception:
6/03/11 14:13:27 ERROR shuffle.OneForOneBlockFetcher: Failed while starting
block fetches
java.io.IOException: Connection from pg-poc-02/10.10.10.92:44231 closed
at
org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
at
org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
at
io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
at
io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
at
io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659)
at
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)


RE: Graphx

2016-03-11 Thread John Lilley
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: Andrew A [mailto:andrew.a...@gmail.com]
Sent: Thursday, March 10, 2016 2:44 PM
To: u...@spark.incubator.apache.org
Subject: Graphx

Hi, is there anyone who use graphx in production? What maximum size of graphs 
did you process by spark and what cluster are you use for it?

i tried calculate pagerank for 1 Gb edges LJ - dataset for LiveJournalPageRank 
from spark examples and i faced with large volume shuffles produced by spark 
which fail my spark job.

Thank you,
Andrew


Doubt on data frame

2016-03-11 Thread ram kumar
Hi,

I registered a dataframe as a table using registerTempTable
and I didn't close the Spark context.

Will the table be available for longer time?

Thanks


Re: Can we use spark inside a web service?

2016-03-11 Thread Andrés Ivaldi
nice discussion , I've a question about  Web Service with Spark.

What Could be the problem using Akka-http as web service (Like play does )
, with one SparkContext created , and the queries over -http akka using
only the instance of  that SparkContext ,

Also about Analytics , we are working on real- time Analytics and as Hemant
said Spark is not a solution for low latency queries. What about using
Ingite for that?


On Fri, Mar 11, 2016 at 6:52 AM, Hemant Bhanawat 
wrote:

> Spark-jobserver is an elegant product that builds concurrency on top of
> Spark. But, the current design of DAGScheduler prevents Spark to become a
> truly concurrent solution for low latency queries. DagScheduler will turn
> out to be a bottleneck for low latency queries. Sparrow project was an
> effort to make Spark more suitable for such scenarios but it never made it
> to the Spark codebase. If Spark has to become a highly concurrent solution,
> scheduling has to be distributed.
>
> Hemant Bhanawat 
> www.snappydata.io
>
> On Fri, Mar 11, 2016 at 7:02 AM, Chris Fregly  wrote:
>
>> great discussion, indeed.
>>
>> Mark Hamstra and i spoke offline just now.
>>
>> Below is a quick recap of our discussion on how they've achieved
>> acceptable performance from Spark on the user request/response path (@mark-
>> feel free to correct/comment).
>>
>> 1) there is a big difference in request/response latency between
>> submitting a full Spark Application (heavy weight) versus having a
>> long-running Spark Application (like Spark Job Server) that submits
>> lighter-weight Jobs using a shared SparkContext.  mark is obviously using
>> the latter - a long-running Spark App.
>>
>> 2) there are some enhancements to Spark that are required to achieve
>> acceptable user request/response times.  some links that Mark provided are
>> as follows:
>>
>>- https://issues.apache.org/jira/browse/SPARK-11838
>>- https://github.com/apache/spark/pull/11036
>>- https://github.com/apache/spark/pull/11403
>>- https://issues.apache.org/jira/browse/SPARK-13523
>>- https://issues.apache.org/jira/browse/SPARK-13756
>>
>> Essentially, a deeper level of caching at the shuffle file layer to
>> reduce compute and memory between queries.
>>
>> Note that Mark is running a slightly-modified version of stock Spark.
>>  (He's mentioned this in prior posts, as well.)
>>
>> And I have to say that I'm, personally, seeing more and more
>> slightly-modified versions of Spark being deployed to production to
>> workaround outstanding PR's and Jiras.
>>
>> this may not be what people want to hear, but it's a trend that i'm
>> seeing lately as more and more customize Spark to their specific use cases.
>>
>> Anyway, thanks for the good discussion, everyone!  This is why we have
>> these lists, right!  :)
>>
>>
>> On Thu, Mar 10, 2016 at 7:51 PM, Evan Chan 
>> wrote:
>>
>>> One of the premises here is that if you can restrict your workload to
>>> fewer cores - which is easier with FiloDB and careful data modeling -
>>> you can make this work for much higher concurrency and lower latency
>>> than most typical Spark use cases.
>>>
>>> The reason why it typically does not work in production is that most
>>> people are using HDFS and files.  These data sources are designed for
>>> running queries and workloads on all your cores across many workers,
>>> and not for filtering your workload down to only one or two cores.
>>>
>>> There is actually nothing inherent in Spark that prevents people from
>>> using it as an app server.   However, the insistence on using it with
>>> HDFS is what kills concurrency.   This is why FiloDB is important.
>>>
>>> I agree there are more optimized stacks for running app servers, but
>>> the choices that you mentioned:  ES is targeted at text search;  Cass
>>> and HBase by themselves are not fast enough for analytical queries
>>> that the OP wants;  and MySQL is great but not scalable.   Probably
>>> something like VectorWise, HANA, Vertica would work well, but those
>>> are mostly not free solutions.   Druid could work too if the use case
>>> is right.
>>>
>>> Anyways, great discussion!
>>>
>>> On Thu, Mar 10, 2016 at 2:46 PM, Chris Fregly  wrote:
>>> > you are correct, mark.  i misspoke.  apologies for the confusion.
>>> >
>>> > so the problem is even worse given that a typical job requires multiple
>>> > tasks/cores.
>>> >
>>> > i have yet to see this particular architecture work in production.  i
>>> would
>>> > love for someone to prove otherwise.
>>> >
>>> > On Thu, Mar 10, 2016 at 5:44 PM, Mark Hamstra >> >
>>> > wrote:
>>> >>>
>>> >>> For example, if you're looking to scale out to 1000 concurrent
>>> requests,
>>> >>> this is 1000 concurrent Spark jobs.  This would require a cluster
>>> with 1000
>>> >>> cores.
>>> >>
>>> >>
>>> >> This doesn't make sense.  A Spark Job is a driver/DAGScheduler concept
>>> >> without any 1:1 correspondence between Worker cores and Jobs.  Cores
>>> ar

udf StructField to JSON String

2016-03-11 Thread Caires Vinicius
I have one DataFrame with nested StructField and I want to convert to JSON
String. There is anyway to accomplish this?


Spark on YARN memory consumption

2016-03-11 Thread Jan Štěrba
Hello,

I am exprimenting with tuning an on demand spark-cluster on top of our
cloudera hadoop. I am running Cloudera 5.5.2 with Spark 1.5 right now
and I am running spark in yarn-client mode.

Right now my main experimentation is about spark.executor.memory
property and I have noticed a strange behaviour.

When I set spark.executor.memory=512M several things happen:
- per each executor a container with 1GB memory is requested and
assigned from YARN
- in Spark UI I can see that each executor has 256M memory

So what I am seeing is that spark requests 2x the memory but the
executor has only 1/4 of what has been requested. Why is that?

Thanks.

--
Jan Sterba
https://twitter.com/honzasterba | http://flickr.com/honzasterba |
http://500px.com/honzasterba

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark on YARN memory consumption

2016-03-11 Thread Silvio Fiorito
Hi Jan,



Yes what you’re seeing is due to YARN container memory overhead. Also, 
typically the memory increments for YARN containers is 1GB.



This gives a good overview: 
http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/



Thanks,

Silvio







From: Jan Štěrba
Sent: Friday, March 11, 2016 8:27 AM
To: User
Subject: Spark on YARN memory consumption



Hello,

I am exprimenting with tuning an on demand spark-cluster on top of our
cloudera hadoop. I am running Cloudera 5.5.2 with Spark 1.5 right now
and I am running spark in yarn-client mode.

Right now my main experimentation is about spark.executor.memory
property and I have noticed a strange behaviour.

When I set spark.executor.memory=512M several things happen:
- per each executor a container with 1GB memory is requested and
assigned from YARN
- in Spark UI I can see that each executor has 256M memory

So what I am seeing is that spark requests 2x the memory but the
executor has only 1/4 of what has been requested. Why is that?

Thanks.

--
Jan Sterba
https://twitter.com/honzasterba | http://flickr.com/honzasterba |
http://500px.com/honzasterba

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on YARN memory consumption

2016-03-11 Thread Jan Štěrba
Thanks that explains a lot.
--
Jan Sterba
https://twitter.com/honzasterba | http://flickr.com/honzasterba |
http://500px.com/honzasterba


On Fri, Mar 11, 2016 at 2:36 PM, Silvio Fiorito
 wrote:
> Hi Jan,
>
>
>
> Yes what you’re seeing is due to YARN container memory overhead. Also,
> typically the memory increments for YARN containers is 1GB.
>
>
>
> This gives a good overview:
> http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
>
>
>
> Thanks,
>
> Silvio
>
>
>
>
>
>
>
> From: Jan Štěrba
> Sent: Friday, March 11, 2016 8:27 AM
> To: User
> Subject: Spark on YARN memory consumption
>
>
>
> Hello,
>
> I am exprimenting with tuning an on demand spark-cluster on top of our
> cloudera hadoop. I am running Cloudera 5.5.2 with Spark 1.5 right now
> and I am running spark in yarn-client mode.
>
> Right now my main experimentation is about spark.executor.memory
> property and I have noticed a strange behaviour.
>
> When I set spark.executor.memory=512M several things happen:
> - per each executor a container with 1GB memory is requested and
> assigned from YARN
> - in Spark UI I can see that each executor has 256M memory
>
> So what I am seeing is that spark requests 2x the memory but the
> executor has only 1/4 of what has been requested. Why is that?
>
> Thanks.
>
> --
> Jan Sterba
> https://twitter.com/honzasterba | http://flickr.com/honzasterba |
> http://500px.com/honzasterba
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Does Spark support in-memory shuffling?

2016-03-11 Thread Xudong Zheng
Hi all,

Does Spark support in-memory shuffling now? If not, is there any
consideration for it?

Thanks!

-- 
Xudong Zheng


Re: Doubt on data frame

2016-03-11 Thread Ted Yu
temporary tables are associated with SessionState which is used
by SQLContext.

Did you keep the session ?

Cheers

On Fri, Mar 11, 2016 at 5:02 AM, ram kumar  wrote:

> Hi,
>
> I registered a dataframe as a table using registerTempTable
> and I didn't close the Spark context.
>
> Will the table be available for longer time?
>
> Thanks
>


RE: Graphx

2016-03-11 Thread John Lilley
A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: lihu [mailto:lihu...@gmail.com]
Sent: Friday, March 11, 2016 7:58 AM
To: John Lilley 
Cc: Andrew A ; u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi, John:
   I am very intersting in your experiment, How can you get that RDD 
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 
5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: Andrew A [mailto:andrew.a...@gmail.com]
Sent: Thursday, March 10, 2016 2:44 PM
To: u...@spark.incubator.apache.org
Subject: Graphx

Hi, is there anyone who use graphx in production? What maximum size of graphs 
did you process by spark and what cluster are you use for it?

i tried calculate pagerank for 1 Gb edges LJ - dataset for LiveJournalPageRank 
from spark examples and i faced with large volume shuffles produced by spark 
which fail my spark job.
Thank you,
Andrew



Re: Graphx

2016-03-11 Thread lihu
Hi, John:
   I am very intersting in your experiment, How can you get that RDD
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
wrote:

> Andrew,
>
>
>
> We conducted some tests for using Graphx to solve the connected-components
> problem and were disappointed.  On 8 nodes of 16GB each, we could not get
> above 100M edges.  On 8 nodes of 60GB each, we could not process 1bn
> edges.  RDD serialization would take excessive time and then we would get
> failures.  By contrast, we have a C++ algorithm that solves 1bn edges using
> memory+disk on a single 16GB node in about an hour.  I think that a very
> large cluster will do better, but we did not explore that.
>
>
>
> *John Lilley*
>
> Chief Architect, RedPoint Global Inc.
>
> T: +1 303 541 1516  *| *M: +1 720 938 5761 *|* F: +1 781-705-2077
>
> Skype: jlilley.redpoint *|* john.lil...@redpoint.net *|* www.redpoint.net
>
>
>
> *From:* Andrew A [mailto:andrew.a...@gmail.com]
> *Sent:* Thursday, March 10, 2016 2:44 PM
> *To:* u...@spark.incubator.apache.org
> *Subject:* Graphx
>
>
>
> Hi, is there anyone who use graphx in production? What maximum size of
> graphs did you process by spark and what cluster are you use for it?
>
> i tried calculate pagerank for 1 Gb edges LJ - dataset for
> LiveJournalPageRank from spark examples and i faced with large volume
> shuffles produced by spark which fail my spark job.
>
> Thank you,
>
> Andrew
>


Re: Kafka + Spark streaming, RDD partitions not processed in parallel

2016-03-11 Thread Cody Koeninger
Can you post your actual code?

On Thu, Mar 10, 2016 at 9:55 PM, Mukul Gupta  wrote:
> Hi All, I was running the following test: Setup 9 VM runing spark workers
> with 1 spark executor each. 1 VM running kafka and spark master. Spark
> version is 1.6.0 Kafka version is 0.9.0.1 Spark is using its own resource
> manager and is not running over YARN. Test I created a kafka topic with 3
> partition. next I used "KafkaUtils.createDirectStream" to get a DStream.
> JavaPairInputDStream stream =
> KafkaUtils.createDirectStream(…); JavaDStream stream1 = stream.map(func1);
> stream1.print(); where func1 just contains a sleep followed by returning of
> value. Observation First RDD partition corresponding to partition 1 of kafka
> was processed on one of the spark executor. Once processing is finished,
> then RDD partitions corresponding to remaining two kafka partitions were
> processed in parallel on different spark executors. I expected that all
> three RDD partitions should have been processed in parallel as there were
> spark executors available which were lying idle. I re-ran the test after
> increasing the partitions of kafka topic to 5. This time also RDD partition
> corresponding to partition 1 of kafka was processed on one of the spark
> executor. Once processing is finished for this RDD partition, then RDD
> partitions corresponding to remaining four kafka partitions were processed
> in parallel on different spark executors. I am not clear about why spark is
> waiting for operations on first RDD partition to finish, while it could
> process remaining partitions in parallel? Am I missing any configuration?
> Any help is appreciated. Thanks, Mukul
> 
> View this message in context: Kafka + Spark streaming, RDD partitions not
> processed in parallel
> Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Graphx

2016-03-11 Thread Ovidiu-Cristian MARCU
Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.
I don’t understand how RDD’s serialization is taking excessive time, compared 
to the total time or other expected time? 

For the different RDD times you have events and UI console and a bunch of 
papers describing how measure different things, lihu: did you used some 
incomplete tool or what are you looking for?

Best,
Ovidiu

> On 11 Mar 2016, at 16:02, John Lilley  wrote:
> 
> A colleague did the experiments and I don’t know exactly how he observed 
> that.  I think it was indirect from the Spark diagnostics indicating the 
> amount of I/O he deduced that this was RDD serialization.  Also when he added 
> light compression to RDD serialization this improved matters.
>  
> John Lilley
> Chief Architect, RedPoint Global Inc.
> T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
> Skype: jlilley.redpoint | john.lil...@redpoint.net 
>  | www.redpoint.net 
> 
>  
> From: lihu [mailto:lihu...@gmail.com] 
> Sent: Friday, March 11, 2016 7:58 AM
> To: John Lilley 
> Cc: Andrew A ; u...@spark.incubator.apache.org
> Subject: Re: Graphx
>  
> Hi, John:
>I am very intersting in your experiment, How can you get that RDD 
> serialization cost lots of time, from the log or some other tools?
>  
> On Fri, Mar 11, 2016 at 8:46 PM, John Lilley  > wrote:
> Andrew,
>  
> We conducted some tests for using Graphx to solve the connected-components 
> problem and were disappointed.  On 8 nodes of 16GB each, we could not get 
> above 100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  
> RDD serialization would take excessive time and then we would get failures.  
> By contrast, we have a C++ algorithm that solves 1bn edges using memory+disk 
> on a single 16GB node in about an hour.  I think that a very large cluster 
> will do better, but we did not explore that.
>  
> John Lilley
> Chief Architect, RedPoint Global Inc.
> T: +1 303 541 1516   | M: +1 720 938 5761 
>  | F: +1 781-705-2077 
> Skype: jlilley.redpoint | john.lil...@redpoint.net 
>  | www.redpoint.net 
> 
>  
> From: Andrew A [mailto:andrew.a...@gmail.com ] 
> Sent: Thursday, March 10, 2016 2:44 PM
> To: u...@spark.incubator.apache.org 
> Subject: Graphx
>  
> Hi, is there anyone who use graphx in production? What maximum size of graphs 
> did you process by spark and what cluster are you use for it?
> 
> i tried calculate pagerank for 1 Gb edges LJ - dataset for 
> LiveJournalPageRank from spark examples and i faced with large volume 
> shuffles produced by spark which fail my spark job.
> 
> Thank you,
> Andrew



Re: Doubt on data frame

2016-03-11 Thread ram kumar
No, I am not aware of it.

Can you provide me with the details regarding this.

Thanks

On Fri, Mar 11, 2016 at 8:25 PM, Ted Yu  wrote:

> temporary tables are associated with SessionState which is used
> by SQLContext.
>
> Did you keep the session ?
>
> Cheers
>
> On Fri, Mar 11, 2016 at 5:02 AM, ram kumar 
> wrote:
>
>> Hi,
>>
>> I registered a dataframe as a table using registerTempTable
>> and I didn't close the Spark context.
>>
>> Will the table be available for longer time?
>>
>> Thanks
>>
>
>


RE: Graphx

2016-03-11 Thread John Lilley
Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley 
Cc: lihu ; Andrew A ; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.
I don’t understand how RDD’s serialization is taking excessive time, compared 
to the total time or other expected time?

For the different RDD times you have events and UI console and a bunch of 
papers describing how measure different things, lihu: did you used some 
incomplete tool or what are you looking for?

Best,
Ovidiu

On 11 Mar 2016, at 16:02, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:

A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: lihu [mailto:lihu...@gmail.com]
Sent: Friday, March 11, 2016 7:58 AM
To: John Lilley mailto:john.lil...@redpoint.net>>
Cc: Andrew A mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi, John:
   I am very intersting in your experiment, How can you get that RDD 
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 
5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: Andrew A [mailto:andrew.a...@gmail.com]
Sent: Thursday, March 10, 2016 2:44 PM
To: u...@spark.incubator.apache.org
Subject: Graphx

Hi, is there anyone who use graphx in production? What maximum size of graphs 
did you process by spark and what cluster are you use for it?

i tried calculate pagerank for 1 Gb 

RE: Graphx

2016-03-11 Thread John Lilley
PS: This is the code I use to generate clustered test dat:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
int edges = Integer.parseInt(args[0]);
int groupSize = Integer.parseInt(args[1]);
int currentEdge = 1;
int currentGroupSize = 0;
for (int i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley 
Cc: lihu ; Andrew A ; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.
I don’t understand how RDD’s serialization is taking excessive time, compared 
to the total time or other expected time?

For the different RDD times you have events and UI console and a bunch of 
papers describing how measure different things, lihu: did you used some 
incomplete tool or what are you looking for?

Best,
Ovidiu

On 11 Mar 2016, at 16:02, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:

A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: lihu [mailto:lihu...@gmail.com]
Sent: Friday, March 11, 2016 7:58 AM
To: John Lilley mailto:john.lil...@redpoint.net>>
Cc: Andrew A mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi, John:
   I am very intersting in your experiment, How can you get that RDD 
serialization cost lots of time, from the log or some other tools?

On Fri, Mar 11, 2016 at 8:46 PM, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:
Andrew,

We conducted some tests for using Graphx to solve the connected-components 
problem and were disappointed.  On 8 nodes of 16GB each, we could not get above 
100M edges.  On 8 nodes of 60GB each, we could not process 1bn edges.  RDD 
serialization would take excessive time and then we would get failures.  By 
contrast, we have a C++ algorithm that solves 1bn edges using memory+disk on a 
single 16GB node in about an hour.  I think that a very large cluster will do 
better, but we did not explore that.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 
5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: Andrew A [mailto:andrew.a...@gmail.com]
Sent: Thursday, March 10, 2016 2:44 PM
To: u...@spark.incubator.apache.org
Subject: Graphx

Hi, is there anyone who use graphx in production? What maximum size of graphs 
did you process by spark and what cluster are you use for it?

i tried calculate pagerank for 1 Gb edges LJ - dataset for LiveJournalPageRank 
from spark examples and i faced with large volume shuffles produced by spark 
which fail my spark job.
Thank you,
Andrew



RE: Graphx

2016-03-11 Thread John Lilley
I suppose for a 2.6bn case we’d need Long:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
long edges = Long.parseLong(args[0]);
long groupSize = Long.parseLong(args[1]);
long currentEdge = 1;
long currentGroupSize = 0;
for (long i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: John Lilley [mailto:john.lil...@redpoint.net]
Sent: Friday, March 11, 2016 8:46 AM
To: Ovidiu-Cristian MARCU 
Cc: lihu ; Andrew A ; 
u...@spark.incubator.apache.org; Geoff Thompson 
Subject: RE: Graphx

Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley mailto:john.lil...@redpoint.net>>
Cc: lihu mailto:lihu...@gmail.com>>; Andrew A 
mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.
I don’t understand how RDD’s serialization is taking excessive time, compared 
to the total time or other expected time?

For the different RDD times you have events and UI console and a bunch of 
papers describing how measure different things, lihu: did you used some 
incomplete tool or what are you looking for?

Best,
Ovidiu

On 11 Mar 2016, at 16:02, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:

A colleague did the experiments and I don’t know exactly how he observed that.  
I think it was indirect from the Spark diagnostics indicating the amount of I/O 
he deduced that this was RDD serialization.  Also when he added light 
compression to RDD serialization this improved matters.

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: lihu [mailto:lihu...@gmail.com]
Sent: Friday, March 11, 2016 7:58 AM
To: John Lilley mailto:john.lil...@redpoint.net>>
Cc: Andrew A mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi, John:
   I am very intersting in your experime

Re: Spark configuration with 5 nodes

2016-03-11 Thread Mich Talebzadeh
Hi Steve,

My argument has always been that if one is going to use Solid State Disks
(SSD), it makes sense to have it for NN disks start-up from fsimage etc.
Obviously NN lives in memory. Would you also rerommend RAID10 (mirroring &
striping) for NN disks?

Thanks





Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 11 March 2016 at 10:42, Steve Loughran  wrote:

>
> On 10 Mar 2016, at 22:15, Ashok Kumar  > wrote:
>
>
> Hi,
>
> We intend  to use 5 servers which will be utilized for building Bigdata
> Hadoop data warehouse system (not using any propriety distribution like
> Hortonworks or Cloudera or others).
>
>
> I'd argue that life is if simpler with either of these, or bigtop+ambari
> built up yourself, for the management and monitoring tools more than
> anything else. Life is simpler if there's a web page of cluster status.
> But: DIY teaches you the internals of how things work, which is good for
> getting your hands dirty later on. Just start to automate things from the
> outset, keep configs under SCM, etc. And decide whether or not you want to
> go with Kerberos (==secure HDFS) from the outset. If you don't, put your
> cluster on a separate isolated subnet. You ought to have the boxes on a
> separate switch anyway if you can, just to avoid network traffic hurting
> anyone else on the switch.
>
> All servers configurations are 512GB RAM, 30TB storage and 16 cores,
> Ubuntu Linux servers. Hadoop will be installed on all the servers/nodes.
> Server 1 will be used for NameNode plus DataNode as well. Server 2 will be
> used for standby NameNode & DataNode. The rest of the servers will be used
> as DataNodes..
>
>
>
> 1. Make sure you've got the HDFS/NN space allocation on the two NNs set up
> so that HDFS blocks don't get into the way of the NN's needs (which ideally
> should be on a separate disk with RAID turned on);
> 2. Same for the worker nodes; temp space matters
> 3. On a small cluster, the cost of a DN failure is more significant: a
> bigger fraction of the data will go offline, recovery bandwidth limited to
> the 4 remaining boxes, etc, etc. Just be aware of that: in a bigger
> cluster, a single server is usually less traumatic. Though HDFS-599 shows
> that even facebook can lose a cluster or two.
>
> Now we would like to install Spark on each servers to create Spark
> cluster. Is that the good thing to do or we should buy additional hardware
> for Spark (minding cost here) or simply do we require additional memory to
> accommodate Spark as well please. In that case how much memory for each
> Spark node would you recommend?
>
>
> You should be running your compute work on the same systems as the data,
> as its the "hadoop cluster way"; locality of data ==> performance. If you
> were to buy more hardware, go for more store+compute, rather than just
> compute.
>
> Spark likes RAM for sharing results; less RAM == more problems. but: you
> can buy extra RAM if you need it, provided you've got space in the servers
> to put it in. Same for storage.
>
> Do make sure that you have ECC memory; there are some papers from google
> and microsoft on that topic if you want links to the details. Without ECC
> your data will be corrupted *and you won't even know*
>
> -Steve
>
>
>


Re: How can I join two DataSet of same case class?

2016-03-11 Thread Xinh Huynh
I think you have to use an alias. To provide an alias to a Dataset:

val d1 = a.as("d1")
val d2 = b.as("d2")

Then join, using the alias in the column names:
d1.joinWith(d2, $"d1.edid" === $"d2.edid")

Finally, please doublecheck your column names. I did not see "edid" in your
case class.

Xinh

On Thu, Mar 10, 2016 at 9:09 PM, 박주형  wrote:

> Hi. I want to join two DataSet. but below stderr is shown
>
> 16/03/11 13:55:51 WARN ColumnName: Constructing trivially true equals
> predicate, ''edid = 'edid'. Perhaps you need to use aliases.
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot
> resolve 'edid' given input columns dataType, avg, sigma, countUnique,
> numRows, recentEdid, categoryId, accCount, statType, categoryId, max,
> accCount, firstQuarter, recentEdid, replicationRateAvg, numRows, min,
> countNotNull, countNotNull, dcid, numDistinctRows, max, firstQuarter, min,
> replicationRateAvg, dcid, statType, avg, sigma, dataType, median,
> thirdQuarter, numDistinctRows, median, countUnique, thirdQuarter;
>
>
> my case class is
> case class Stat(statType: Int, dataType: Int, dcid: Int,
> categoryId: Int, recentEdid: Int, countNotNull: Int, countUnique:
> Int, accCount: Int, replicationRateAvg: Double,
> numDistinctRows: Double, numRows: Double,
> min: Double, max: Double, sigma: Double, avg: Double,
> firstQuarter: Double, thirdQuarter: Double, median: Double)
>
> and my code is
> a.joinWith(b, $"edid" === $"edid").show()
>
> If i use DataFrame, renaming a’s column could solve it. How can I join two
> DataSet of same case class?
>


Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Daniel Siegmann
On Fri, Mar 11, 2016 at 5:29 AM, Nick Pentreath 
wrote:

> Would you mind letting us know the # training examples in the datasets?
> Also, what do your features look like? Are they text, categorical etc? You
> mention that most rows only have a few features, and all rows together have
> a few 10,000s features, yet your max feature value is 20 million. How are
> your constructing your feature vectors to get a 20 million size? The only
> realistic way I can see this situation occurring in practice is with
> feature hashing (HashingTF).
>

The sub-sample I'm currently training on is about 50K rows, so ... small.

The features causing this issue are numeric (int) IDs for ... lets call it
"Thing". For each Thing in the record, we set the feature Thing.id to a
value of 1.0 in our vector (which is of course a SparseVector). I'm not
sure how IDs are generated for Things, but they can be large numbers.

The largest Thing ID is around 20 million, so that ends up being the size
of the vector. But in fact there are fewer than 10,000 unique Thing IDs in
this data. The mean number of features per record in what I'm currently
training against is 41, while the maximum for any given record was 1754.

It is possible to map the features into a small set (just need to
zipWithIndex), but this is undesirable because of the added complexity (not
just for the training, but also anything wanting to score against the
model). It might be a little easier if this could be encapsulated within
the model object itself (perhaps via composition), though I'm not sure how
feasible that is.

But I'd rather not bother with dimensionality reduction at all - since we
can train using liblinear in just a few minutes, it doesn't seem necessary.


>
> MultivariateOnlineSummarizer uses dense arrays, but it should be possible
> to enable sparse data. Though in theory, the result will tend to be dense
> anyway, unless you have very many entries in the input feature vector that
> never occur and are actually zero throughout the data set (which it seems
> is the case with your data?). So I doubt whether using sparse vectors for
> the summarizer would improve performance in general.
>

Yes, that is exactly my case - the vast majority of entries in the input
feature vector will *never* occur. Presumably that means most of the values
in the aggregators' arrays will be zero.


>
> LR doesn't accept a sparse weight vector, as it uses dense vectors for
> coefficients and gradients currently. When using L1 regularization, it
> could support sparse weight vectors, but the current implementation doesn't
> do that yet.
>

Good to know it is theoretically possible to implement. I'll have to give
it some thought. In the meantime I guess I'll experiment with coalescing
the data to minimize the communication overhead.

Thanks again.


Re: udf StructField to JSON String

2016-03-11 Thread Tristan Nixon
Have you looked at DataFrame.write.json( path )?
https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

> On Mar 11, 2016, at 7:15 AM, Caires Vinicius  wrote:
> 
> I have one DataFrame with nested StructField and I want to convert to JSON 
> String. There is anyway to accomplish this?


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Does Spark support in-memory shuffling?

2016-03-11 Thread Ted Yu
Please take a look at SPARK-3376 and discussion on
https://github.com/apache/spark/pull/5403

FYI

On Fri, Mar 11, 2016 at 6:37 AM, Xudong Zheng  wrote:

> Hi all,
>
> Does Spark support in-memory shuffling now? If not, is there any
> consideration for it?
>
> Thanks!
>
> --
> Xudong Zheng
>


Re: Get output of the ALS algorithm.

2016-03-11 Thread Bryan Cutler
Are you trying to save predictions on a dataset to a file, or the model
produced after training with ALS?

On Thu, Mar 10, 2016 at 7:57 PM, Shishir Anshuman  wrote:

> hello,
>
> I am new to Apache Spark and would like to get the Recommendation output
> of the ALS algorithm in a file.
> Please suggest me the solution.
>
> Thank you
>
>
>


Re: Doubt on data frame

2016-03-11 Thread Mich Talebzadeh
Temporary tables are created in temp file space within the session. Once
the session is closed then the temporary table goes

scala> rs.registerTempTable("mytemp")

And this is the temporary file created with the above command

drwx--   - hdusersupergroup  0 2016-03-11 17:09
/tmp/hive/hduser/a1a80c24-213a-41e0-bbcb-75faf38597c3
drwx--   - hdusersupergroup  0 2016-03-11 17:09
/tmp/hive/hduser/a1a80c24-213a-41e0-bbcb-75faf38597c3/_tmp_space.db


Now if I open a second session, run the same query and create another
temporary table as before

scala> rs.registerTempTable("mytemp")

 I would get

drwx--   - hdusersupergroup  0 2016-03-11 17:22
/tmp/hive/hduser/cfc7f73c-cfc9-4cc0-a09d-823b07051cbd
drwx--   - hdusersupergroup  0 2016-03-11 17:22
/tmp/hive/hduser/cfc7f73c-cfc9-4cc0-a09d-823b07051cbd/_tmp_space.db


These two tables reference different files so they never collide and they
are private to the session.

If you are familiar with #tables in Sybase or SQL Server, these temporary
tables in Spark play the same role. They are valid within that session and
private to that session

HTH






Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 11 March 2016 at 15:42, ram kumar  wrote:

> No, I am not aware of it.
>
> Can you provide me with the details regarding this.
>
> Thanks
>
> On Fri, Mar 11, 2016 at 8:25 PM, Ted Yu  wrote:
>
>> temporary tables are associated with SessionState which is used
>> by SQLContext.
>>
>> Did you keep the session ?
>>
>> Cheers
>>
>> On Fri, Mar 11, 2016 at 5:02 AM, ram kumar 
>> wrote:
>>
>>> Hi,
>>>
>>> I registered a dataframe as a table using registerTempTable
>>> and I didn't close the Spark context.
>>>
>>> Will the table be available for longer time?
>>>
>>> Thanks
>>>
>>
>>
>


Re: Get output of the ALS algorithm.

2016-03-11 Thread Jacek Laskowski
What about write.save(file)?

P.s. I'm new to Spark MLlib.
11.03.2016 4:57 AM "Shishir Anshuman" 
napisał(a):

> hello,
>
> I am new to Apache Spark and would like to get the Recommendation output
> of the ALS algorithm in a file.
> Please suggest me the solution.
>
> Thank you
>
>
>


Re: Kafka + Spark streaming, RDD partitions not processed in parallel

2016-03-11 Thread Mukul Gupta
Please note that while building jar of code below, i used spark 1.6.0 + kafka 
0.9.0.0 libraries
I also tried spark 1.5.0 + kafka 0.9.0.1 combination, but encountered the same 
issue.

I could not use the ideal combination spark 1.6.0 + kafka 0.9.0.1 (which 
matches with spark and kafka versions installed on my machine) because while 
doing so, i get the following error at run time:
Exception in thread "main" java.lang.ClassCastException: 
kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker

package sparktest;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

package sparktest;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;

import org.apache.spark.SparkConf;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class SparkTest {

public static void main(String[] args) {

if (args.length < 5) {
System.err.println("Usage: SparkTest
 ");
System.exit(1);
}

String kafkaBroker = args[0];
String sparkMaster = args[1];
String topics = args[2];
String consumerGroupID = args[3];
String durationSec = args[4];

int duration = 0;

try {
duration = Integer.parseInt(durationSec);
} catch (Exception e) {
System.err.println("Illegal duration");
System.exit(1);
}

HashSet topicsSet = new 
HashSet(Arrays.asList(topics.split(",")));

SparkConf  conf = new 
SparkConf().setMaster(sparkMaster).setAppName("DirectStreamDemo");

JavaStreamingContext jssc = new JavaStreamingContext(conf, 
Durations.seconds(duration));

HashMap kafkaParams = new HashMap();
kafkaParams.put("metadata.broker.list", kafkaBroker);
kafkaParams.put("group.id", consumerGroupID);

JavaPairInputDStream messages = 
KafkaUtils.createDirectStream(jssc, String.class, String.class,
StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);

JavaDStream processed = messages.map(new Function, String>() {

@Override
public String call(Tuple2 arg0) throws Exception {

Thread.sleep(7000);
return arg0._2;
}
});

processed.print(90);

try {
jssc.start();
jssc.awaitTermination();
} catch (Exception e) {

} finally {
jssc.close();
}
}
}



From: Cody Koeninger 
Sent: 11 March 2016 20:42
To: Mukul Gupta
Cc: user@spark.apache.org
Subject: Re: Kafka + Spark streaming, RDD partitions not processed in parallel

Can you post your actual code?

On Thu, Mar 10, 2016 at 9:55 PM, Mukul Gupta  wrote:
> Hi All, I was running the following test: Setup 9 VM runing spark workers
> with 1 spark executor each. 1 VM running kafka and spark master. Spark
> version is 1.6.0 Kafka version is 0.9.0.1 Spark is using its own resource
> manager and is not running over YARN. Test I created a kafka topic with 3
> partition. next I used "KafkaUtils.createDirectStream" to get a DStream.
> JavaPairInputDStream stream =
> KafkaUtils.createDirectStream(…); JavaDStream stream1 = stream.map(func1);
> stream1.print(); where func1 just contains a sleep followed by returning of
> value. Observation First RDD partition corresponding to partition 1 of kafka
> was processed on one of the spark executor. Once processing is finished,
> then RDD partitions corresponding to remaining two kafka partitions were
> processed in parallel on different spark executors. I expected that all
> three RDD partitions should have been processed in parallel as there were
> spark executors available which were lying idle. I re-ran the test after
> increasing the partitions of kafka topic to 5. This time also RDD partition
> corresponding to partition 1 of kafka was processed on one of the spark
> executor. Once processing is finished for this RDD partition, then RDD
> partitions corresponding to remaining four kafka partitions were processed
> in parallel on different spark executors. I am not clear about why spark is
> waiting for operations on first RDD partition to finish, while it could
> process remaining partitions in parallel? Am I missing any configuration?
> Any help is appreciated. Thanks, Mukul
> 
> View this message in context: Kafka + Spark streaming, RDD partitions not
> processed in parallel
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
"DISCLAIMER: This message

Re: Kafka + Spark streaming, RDD partitions not processed in parallel

2016-03-11 Thread Cody Koeninger
Why are you including a specific dependency on Kafka?  Spark's
external streaming kafka module already depends on kafka.

Can you link to an actual repo with build file etc?

On Fri, Mar 11, 2016 at 11:21 AM, Mukul Gupta  wrote:
> Please note that while building jar of code below, i used spark 1.6.0 + kafka 
> 0.9.0.0 libraries
> I also tried spark 1.5.0 + kafka 0.9.0.1 combination, but encountered the 
> same issue.
>
> I could not use the ideal combination spark 1.6.0 + kafka 0.9.0.1 (which 
> matches with spark and kafka versions installed on my machine) because while 
> doing so, i get the following error at run time:
> Exception in thread "main" java.lang.ClassCastException: 
> kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
>
> package sparktest;
>
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import kafka.serializer.StringDecoder;
> import scala.Tuple2;
>
> package sparktest;
>
> import java.util.Arrays;
> import java.util.HashMap;
> import java.util.HashSet;
>
> import org.apache.spark.SparkConf;
> import org.apache.spark.streaming.api.java.JavaDStream;
> import org.apache.spark.api.java.function.Function;
> import org.apache.spark.streaming.Durations;
> import org.apache.spark.streaming.api.java.JavaPairInputDStream;
> import org.apache.spark.streaming.api.java.JavaStreamingContext;
> import org.apache.spark.streaming.kafka.KafkaUtils;
> import kafka.serializer.StringDecoder;
> import scala.Tuple2;
>
> public class SparkTest {
>
> public static void main(String[] args) {
>
> if (args.length < 5) {
> System.err.println("Usage: SparkTest
>  ");
> System.exit(1);
> }
>
> String kafkaBroker = args[0];
> String sparkMaster = args[1];
> String topics = args[2];
> String consumerGroupID = args[3];
> String durationSec = args[4];
>
> int duration = 0;
>
> try {
> duration = Integer.parseInt(durationSec);
> } catch (Exception e) {
> System.err.println("Illegal duration");
> System.exit(1);
> }
>
> HashSet topicsSet = new 
> HashSet(Arrays.asList(topics.split(",")));
>
> SparkConf  conf = new 
> SparkConf().setMaster(sparkMaster).setAppName("DirectStreamDemo");
>
> JavaStreamingContext jssc = new JavaStreamingContext(conf, 
> Durations.seconds(duration));
>
> HashMap kafkaParams = new HashMap();
> kafkaParams.put("metadata.broker.list", kafkaBroker);
> kafkaParams.put("group.id", consumerGroupID);
>
> JavaPairInputDStream messages = 
> KafkaUtils.createDirectStream(jssc, String.class, String.class,
> StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
>
> JavaDStream processed = messages.map(new Function String>, String>() {
>
> @Override
> public String call(Tuple2 arg0) throws Exception {
>
> Thread.sleep(7000);
> return arg0._2;
> }
> });
>
> processed.print(90);
>
> try {
> jssc.start();
> jssc.awaitTermination();
> } catch (Exception e) {
>
> } finally {
> jssc.close();
> }
> }
> }
>
>
> 
> From: Cody Koeninger 
> Sent: 11 March 2016 20:42
> To: Mukul Gupta
> Cc: user@spark.apache.org
> Subject: Re: Kafka + Spark streaming, RDD partitions not processed in parallel
>
> Can you post your actual code?
>
> On Thu, Mar 10, 2016 at 9:55 PM, Mukul Gupta  wrote:
>> Hi All, I was running the following test: Setup 9 VM runing spark workers
>> with 1 spark executor each. 1 VM running kafka and spark master. Spark
>> version is 1.6.0 Kafka version is 0.9.0.1 Spark is using its own resource
>> manager and is not running over YARN. Test I created a kafka topic with 3
>> partition. next I used "KafkaUtils.createDirectStream" to get a DStream.
>> JavaPairInputDStream stream =
>> KafkaUtils.createDirectStream(…); JavaDStream stream1 = stream.map(func1);
>> stream1.print(); where func1 just contains a sleep followed by returning of
>> value. Observation First RDD partition corresponding to partition 1 of kafka
>> was processed on one of the spark executor. Once processing is finished,
>> then RDD partitions corresponding to remaining two kafka partitions were
>> processed in parallel on different spark executors. I expected that all
>> three RDD partitions should have been processed in parallel as there were
>> spark executors available which were lying idle. I re-ran the test after
>> increasing the partitions of kafka topic to 5. This time also RDD partition
>> corresponding to partition 1 of kafka was processed on one of the spark
>> executor. Once processing is finished for this RDD partition, then RDD
>> partitions corresponding to remaining four kafka partitions were processed
>> in parallel on different spark execu

Re: How can I join two DataSet of same case class?

2016-03-11 Thread Jacek Laskowski
Hi,

Use the names of the datasets not $, i. e. a("edid").

Jacek
11.03.2016 6:09 AM "박주형"  napisał(a):

> Hi. I want to join two DataSet. but below stderr is shown
>
> 16/03/11 13:55:51 WARN ColumnName: Constructing trivially true equals
> predicate, ''edid = 'edid'. Perhaps you need to use aliases.
> Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot
> resolve 'edid' given input columns dataType, avg, sigma, countUnique,
> numRows, recentEdid, categoryId, accCount, statType, categoryId, max,
> accCount, firstQuarter, recentEdid, replicationRateAvg, numRows, min,
> countNotNull, countNotNull, dcid, numDistinctRows, max, firstQuarter, min,
> replicationRateAvg, dcid, statType, avg, sigma, dataType, median,
> thirdQuarter, numDistinctRows, median, countUnique, thirdQuarter;
>
>
> my case class is
> case class Stat(statType: Int, dataType: Int, dcid: Int,
> categoryId: Int, recentEdid: Int, countNotNull: Int, countUnique:
> Int, accCount: Int, replicationRateAvg: Double,
> numDistinctRows: Double, numRows: Double,
> min: Double, max: Double, sigma: Double, avg: Double,
> firstQuarter: Double, thirdQuarter: Double, median: Double)
>
> and my code is
> a.joinWith(b, $"edid" === $"edid").show()
>
> If i use DataFrame, renaming a’s column could solve it. How can I join two
> DataSet of same case class?
>


Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Nick Pentreath
Ok, I think I understand things better now.

For Spark's current implementation, you would need to map those features as
you mention. You could also use say StringIndexer -> OneHotEncoder or
VectorIndexer. You could create a Pipeline to deal with the mapping and
training (e.g.
http://spark.apache.org/docs/latest/ml-guide.html#example-pipeline).
Pipeline supports persistence.

But it depends on your scoring use case too - a Spark pipeline can be saved
and then reloaded, but you need all of Spark dependencies in your serving
app which is often not ideal. If you're doing bulk scoring offline, then it
may suit.

Honestly though, for that data size I'd certainly go with something like
Liblinear :) Spark will ultimately scale better with # training examples
for very large scale problems. However there are definitely limitations on
model dimension and sparse weight vectors currently. There are potential
solutions to these but they haven't been implemented as yet.

On Fri, 11 Mar 2016 at 18:35 Daniel Siegmann 
wrote:

> On Fri, Mar 11, 2016 at 5:29 AM, Nick Pentreath 
> wrote:
>
>> Would you mind letting us know the # training examples in the datasets?
>> Also, what do your features look like? Are they text, categorical etc? You
>> mention that most rows only have a few features, and all rows together have
>> a few 10,000s features, yet your max feature value is 20 million. How are
>> your constructing your feature vectors to get a 20 million size? The only
>> realistic way I can see this situation occurring in practice is with
>> feature hashing (HashingTF).
>>
>
> The sub-sample I'm currently training on is about 50K rows, so ... small.
>
> The features causing this issue are numeric (int) IDs for ... lets call
> it "Thing". For each Thing in the record, we set the feature Thing.id to
> a value of 1.0 in our vector (which is of course a SparseVector). I'm not
> sure how IDs are generated for Things, but they can be large numbers.
>
> The largest Thing ID is around 20 million, so that ends up being the size
> of the vector. But in fact there are fewer than 10,000 unique Thing IDs in
> this data. The mean number of features per record in what I'm currently
> training against is 41, while the maximum for any given record was 1754.
>
> It is possible to map the features into a small set (just need to
> zipWithIndex), but this is undesirable because of the added complexity (not
> just for the training, but also anything wanting to score against the
> model). It might be a little easier if this could be encapsulated within
> the model object itself (perhaps via composition), though I'm not sure how
> feasible that is.
>
> But I'd rather not bother with dimensionality reduction at all - since we
> can train using liblinear in just a few minutes, it doesn't seem necessary.
>
>
>>
>> MultivariateOnlineSummarizer uses dense arrays, but it should be
>> possible to enable sparse data. Though in theory, the result will tend to
>> be dense anyway, unless you have very many entries in the input feature
>> vector that never occur and are actually zero throughout the data set
>> (which it seems is the case with your data?). So I doubt whether using
>> sparse vectors for the summarizer would improve performance in general.
>>
>
> Yes, that is exactly my case - the vast majority of entries in the input
> feature vector will *never* occur. Presumably that means most of the
> values in the aggregators' arrays will be zero.
>
>
>>
>> LR doesn't accept a sparse weight vector, as it uses dense vectors for
>> coefficients and gradients currently. When using L1 regularization, it
>> could support sparse weight vectors, but the current implementation doesn't
>> do that yet.
>>
>
> Good to know it is theoretically possible to implement. I'll have to give
> it some thought. In the meantime I guess I'll experiment with coalescing
> the data to minimize the communication overhead.
>
> Thanks again.
>


Re: udf StructField to JSON String

2016-03-11 Thread Caires Vinicius
Hmm. I think my problem is a little more complex. I'm using
https://github.com/databricks/spark-redshift and when I read from JSON file
I got this schema.

root

|-- app: string (nullable = true)

 |-- ct: long (nullable = true)

 |-- event: struct (nullable = true)

||-- attributes: struct (nullable = true)

 |||-- account: string (nullable = true)

 |||-- accountEmail: string (nullable = true)

 |||-- accountId: string (nullable = true)


I want to transform the Column *event* into String (formatted as JSON).

I was trying to use udf but without success.

On Fri, Mar 11, 2016 at 1:53 PM Tristan Nixon  wrote:

> Have you looked at DataFrame.write.json( path )?
>
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>
> > On Mar 11, 2016, at 7:15 AM, Caires Vinicius  wrote:
> >
> > I have one DataFrame with nested StructField and I want to convert to
> JSON String. There is anyway to accomplish this?
>
>


spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath
I've run into a situation where it would appear that foreachPartition is only 
running on one of my executors.

I have a small cluster (2 executors with 8 cores each).

When I run a job with a small file (with 16 partitions) I can see that the 16 
partitions are initialized but they all appear to be initialized on only one 
executor.  All of the work then runs on this  one executor (even though the 
number of partitions is 16). This seems odd, but at least it works.  Not sure 
why the other executor was not used.

However, when I run a larger file (once again with 16 partitions) I can see 
that the 16 partitions are initialized once again (but all on the same 
executor).  But, this time subsequent work is now spread across the 2 
executors.  This of course results in problems because the other executor was 
not initialized as all of the partitions were only initialized on the other 
executor.

Does anyone have any suggestions for where I might want to investigate?  Has 
anyone else seen something like this before?  Any thoughts/insights would be 
appreciated.  I'm using the Stand Alone Cluster manager, cluster started with 
the spark ec2 scripts  and submitting my job using spark-submit.

Thanks.

Darin.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: udf StructField to JSON String

2016-03-11 Thread Tristan Nixon
I have a similar situation in an app of mine. I implemented a custom ML 
Transformer that wraps the Jackson ObjectMapper - this gives you full control 
over how your custom entities / structs are serialized.

> On Mar 11, 2016, at 11:53 AM, Caires Vinicius  wrote:
> 
> Hmm. I think my problem is a little more complex. I'm using 
> https://github.com/databricks/spark-redshift 
>  and when I read from JSON file 
> I got this schema.
> 
> root
> |-- app: string (nullable = true)
> 
>  |-- ct: long (nullable = true)
> 
>  |-- event: struct (nullable = true)
> 
> ||-- attributes: struct (nullable = true)
> 
>  |||-- account: string (nullable = true)
> 
>  |||-- accountEmail: string (nullable = true)
> 
> 
>  |||-- accountId: string (nullable = true)
> 
> 
> 
> I want to transform the Column event into String (formatted as JSON). 
> 
> I was trying to use udf but without success.
> 
> 
> On Fri, Mar 11, 2016 at 1:53 PM Tristan Nixon  > wrote:
> Have you looked at DataFrame.write.json( path )?
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>  
> 
> 
> > On Mar 11, 2016, at 7:15 AM, Caires Vinicius  > > wrote:
> >
> > I have one DataFrame with nested StructField and I want to convert to JSON 
> > String. There is anyway to accomplish this?
> 



Re: Python unit tests - Unable to ru it with Python 2.6 or 2.7

2016-03-11 Thread Davies Liu
Spark 2.0 is dropping the support for Python 2.6, it only work with
Python 2.7, and 3.4+

On Thu, Mar 10, 2016 at 11:17 PM, Gayathri Murali
 wrote:
> Hi all,
>
> I am trying to run python unit tests.
>
> I currently have Python 2.6 and 2.7 installed. I installed unittest2 against 
> both of them.
>
> When I try to run /python/run-tests with Python 2.7 I get the following error 
> :
>
> Please install unittest2 to test with Python 2.6 or earlier
> Had test failures in pyspark.sql.tests with python2.6; see logs.
>
> When I try to run /python/run-tests with Python 2.6 I get the following error:
>
> Traceback (most recent call last):
>   File "./python/run-tests.py", line 42, in 
> from sparktestsupport.modules import all_modules  # noqa
>   File "/Users/gayathri/spark/python/../dev/sparktestsupport/modules.py", 
> line 18, in 
> from functools import total_ordering
> ImportError: cannot import name total_ordering
>
> total_ordering is a package that is available in 2.7.
>
> Can someone help?
>
> Thanks
> Gayathri
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Graphx

2016-03-11 Thread Alexander Pivovarov
we use it in prod

70 boxes, 61GB RAM each

GraphX Connected Components works fine on 250M Vertices and 1B Edges (takes
about 5-10 min)

Spark likes memory, so use r3.2xlarge boxes (61GB)
For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge
(30.5 GB) (especially if you have skewed data)

Also, use checkpoints before and after Connected Components to reduce DAG
delays

You can also try to enable Kryo and register classes used in RDD


On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
wrote:

> I suppose for a 2.6bn case we’d need Long:
>
>
>
> public class GenCCInput {
>
>   public static void main(String[] args) {
>
> if (args.length != 2) {
>
>   System.err.println("Usage: \njava GenCCInput  ");
>
>   System.exit(-1);
>
> }
>
> long edges = Long.parseLong(args[0]);
>
> long groupSize = Long.parseLong(args[1]);
>
> long currentEdge = 1;
>
> long currentGroupSize = 0;
>
> for (long i = 0; i < edges; i++) {
>
>   System.out.println(currentEdge + " " + (currentEdge + 1));
>
>   if (currentGroupSize == 0) {
>
> currentGroupSize = 2;
>
>   } else {
>
> currentGroupSize++;
>
>   }
>
>   if (currentGroupSize >= groupSize) {
>
> currentGroupSize = 0;
>
> currentEdge += 2;
>
>   } else {
>
> currentEdge++;
>
>   }
>
> }
>
>   }
>
> }
>
>
>
> *John Lilley*
>
> Chief Architect, RedPoint Global Inc.
>
> T: +1 303 541 1516  *| *M: +1 720 938 5761 *|* F: +1 781-705-2077
>
> Skype: jlilley.redpoint *|* john.lil...@redpoint.net *|* www.redpoint.net
>
>
>
> *From:* John Lilley [mailto:john.lil...@redpoint.net]
> *Sent:* Friday, March 11, 2016 8:46 AM
> *To:* Ovidiu-Cristian MARCU 
> *Cc:* lihu ; Andrew A ;
> u...@spark.incubator.apache.org; Geoff Thompson <
> geoff.thomp...@redpoint.net>
> *Subject:* RE: Graphx
>
>
>
> Ovidiu,
>
>
>
> IMHO, this is one of the biggest issues facing GraphX and Spark.  There
> are a lot of knobs and levers to pull to affect performance, with very
> little guidance about which settings work in general.  We cannot ship
> software that requires end-user tuning; it just has to work.  Unfortunately
> GraphX seems very sensitive to working set size relative to available RAM
> and fails catastrophically as opposed to gracefully when working set is too
> large.  It is also very sensitive to the nature of the data.  For example,
> if we build a test file with input-edge representation like:
>
> 1 2
>
> 2 3
>
> 3 4
>
> 5 6
>
> 6 7
>
> 7 8
>
> …
>
> this represents a graph with connected components in groups of four.  We
> found experimentally that when this data in input in clustered order, the
> required memory is lower and runtime is much faster than when data is input
> in random order.  This makes intuitive sense because of the additional
> communication required for the random order.
>
>
>
> Our 1bn-edge test case was of this same form, input in clustered order,
> with groups of 10 vertices per component.  It failed at 8 x 60GB.  This is
> the kind of data that our application processes, so it is a realistic test
> for us.  I’ve found that social media test data sets tend to follow
> power-law distributions, and that GraphX has much less problem with them.
>
>
>
> A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges
> in 10-vertex components using the synthetic test input I describe above.  I
> would be curious to know if this works and what settings you use to
> succeed, and if it continues to succeed for random input order.
>
>
>
> As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2)
> behavior for large data sets, but it processes the 1bn-edge case on a
> single 60GB node in about 20 minutes.  It degrades gracefully along the
> O(N^2) curve and additional memory reduces time.
>
>
>
> *John Lilley*
>
>
>
> *From:* Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr
> ]
> *Sent:* Friday, March 11, 2016 8:14 AM
> *To:* John Lilley 
> *Cc:* lihu ; Andrew A ;
> u...@spark.incubator.apache.org
> *Subject:* Re: Graphx
>
>
>
> Hi,
>
>
>
> I wonder what version of Spark and different parameter configuration you
> used.
>
> I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations)
> using 16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
>
> John: I suppose your C++ app (algorithm) does not scale if you used only
> one node.
>
> I don’t understand how RDD’s serialization is taking excessive time,
> compared to the total time or other expected time?
>
>
>
> For the different RDD times you have events and UI console and a bunch of
> papers describing how measure different things, lihu: did you used some
> incomplete tool or what are you looking for?
>
>
>
> Best,
>
> Ovidiu
>
>
>
> On 11 Mar 2016, at 16:02, John Lilley  wrote:
>
>
>
> A colleague did the experiments and I don’t know exactly how he observed
> that.  I think it was indirect from the Spark diagnostics indicating the
> amount of I/O he deduced that

Re: Python unit tests - Unable to ru it with Python 2.6 or 2.7

2016-03-11 Thread Gayathri Murali
I do have 2.7 installed and unittest2 package available. I still see this
error :

Please install unittest2 to test with Python 2.6 or earlier
Had test failures in pyspark.sql.tests with python2.6; see logs.

Thanks
Gayathri



On Fri, Mar 11, 2016 at 10:07 AM, Davies Liu  wrote:

> Spark 2.0 is dropping the support for Python 2.6, it only work with
> Python 2.7, and 3.4+
>
> On Thu, Mar 10, 2016 at 11:17 PM, Gayathri Murali
>  wrote:
> > Hi all,
> >
> > I am trying to run python unit tests.
> >
> > I currently have Python 2.6 and 2.7 installed. I installed unittest2
> against both of them.
> >
> > When I try to run /python/run-tests with Python 2.7 I get the following
> error :
> >
> > Please install unittest2 to test with Python 2.6 or earlier
> > Had test failures in pyspark.sql.tests with python2.6; see logs.
> >
> > When I try to run /python/run-tests with Python 2.6 I get the following
> error:
> >
> > Traceback (most recent call last):
> >   File "./python/run-tests.py", line 42, in 
> > from sparktestsupport.modules import all_modules  # noqa
> >   File
> "/Users/gayathri/spark/python/../dev/sparktestsupport/modules.py", line 18,
> in 
> > from functools import total_ordering
> > ImportError: cannot import name total_ordering
> >
> > total_ordering is a package that is available in 2.7.
> >
> > Can someone help?
> >
> > Thanks
> > Gayathri
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Is there Graph Partitioning impl for Scala/Spark?

2016-03-11 Thread Alexander Pivovarov
JUNG library has 4 Community Detection (Community Structure) algorithms
implemented including Girvan–Newman algorithm
(EdgeBetweennessClusterer.java)

https://github.com/jrtom/jung/tree/master/jung-algorithms/src/main/java/edu/uci/ics/jung/algorithms/cluster

Girvan–Newman algorithm paper
http://www.santafe.edu/media/workingpapers/01-12-077.pdf
---

iGraph has 7 algorithms implemented including InfoMap and Louvain but lic
is written in C/C++
http://www.r-bloggers.com/summary-of-community-detection-algorithms-in-igraph-0-6/

On Wed, Mar 9, 2016 at 10:40 PM, Alexander Pivovarov 
wrote:

> Is there Graph Partitioning impl (e.g. Spectral ) which can be used in
> Spark?
> I guess it should be at least java/scala lib
> Maybe even tuned to work with GraphX
>


RE: Graphx

2016-03-11 Thread John Lilley
Thanks Alexander, this is really good information.  However it reinforces that 
we cannot use GraphX, because our customers typically have on-prem clusters in 
the 10-node range.  Very few have the kind of horsepower you are talking about. 
 We can’t just tell them to quadruple their cluster size to run our software on 
1bn edges.

John Lilley

From: Alexander Pivovarov [mailto:apivova...@gmail.com]
Sent: Friday, March 11, 2016 11:13 AM
To: John Lilley 
Cc: Ovidiu-Cristian MARCU ; lihu 
; Andrew A ; 
u...@spark.incubator.apache.org; Geoff Thompson 
Subject: Re: Graphx

we use it in prod

70 boxes, 61GB RAM each

GraphX Connected Components works fine on 250M Vertices and 1B Edges (takes 
about 5-10 min)

Spark likes memory, so use r3.2xlarge boxes (61GB)
For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge (30.5 
GB) (especially if you have skewed data)

Also, use checkpoints before and after Connected Components to reduce DAG delays

You can also try to enable Kryo and register classes used in RDD


On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:
I suppose for a 2.6bn case we’d need Long:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
long edges = Long.parseLong(args[0]);
long groupSize = Long.parseLong(args[1]);
long currentEdge = 1;
long currentGroupSize = 0;
for (long i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 
5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: John Lilley 
[mailto:john.lil...@redpoint.net]
Sent: Friday, March 11, 2016 8:46 AM
To: Ovidiu-Cristian MARCU 
mailto:ovidiu-cristian.ma...@inria.fr>>
Cc: lihu mailto:lihu...@gmail.com>>; Andrew A 
mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org; Geoff 
Thompson mailto:geoff.thomp...@redpoint.net>>
Subject: RE: Graphx

Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley mailto:john.lil...@redpoint.net>>
Cc: lihu mailto:lihu...@gmail.com>>; Andrew A 
mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose 

Re: Graphx

2016-03-11 Thread Alexis Roos
Also we keep the Node info minimal as needed for connected components and
rejoin later.

Alexis

On Fri, Mar 11, 2016 at 10:12 AM, Alexander Pivovarov 
wrote:

> we use it in prod
>
> 70 boxes, 61GB RAM each
>
> GraphX Connected Components works fine on 250M Vertices and 1B Edges
> (takes about 5-10 min)
>
> Spark likes memory, so use r3.2xlarge boxes (61GB)
> For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge
> (30.5 GB) (especially if you have skewed data)
>
> Also, use checkpoints before and after Connected Components to reduce DAG
> delays
>
> You can also try to enable Kryo and register classes used in RDD
>
>
> On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
> wrote:
>
>> I suppose for a 2.6bn case we’d need Long:
>>
>>
>>
>> public class GenCCInput {
>>
>>   public static void main(String[] args) {
>>
>> if (args.length != 2) {
>>
>>   System.err.println("Usage: \njava GenCCInput  ");
>>
>>   System.exit(-1);
>>
>> }
>>
>> long edges = Long.parseLong(args[0]);
>>
>> long groupSize = Long.parseLong(args[1]);
>>
>> long currentEdge = 1;
>>
>> long currentGroupSize = 0;
>>
>> for (long i = 0; i < edges; i++) {
>>
>>   System.out.println(currentEdge + " " + (currentEdge + 1));
>>
>>   if (currentGroupSize == 0) {
>>
>> currentGroupSize = 2;
>>
>>   } else {
>>
>> currentGroupSize++;
>>
>>   }
>>
>>   if (currentGroupSize >= groupSize) {
>>
>> currentGroupSize = 0;
>>
>> currentEdge += 2;
>>
>>   } else {
>>
>> currentEdge++;
>>
>>   }
>>
>> }
>>
>>   }
>>
>> }
>>
>>
>>
>> *John Lilley*
>>
>> Chief Architect, RedPoint Global Inc.
>>
>> T: +1 303 541 1516  *| *M: +1 720 938 5761 *|* F: +1 781-705-2077
>>
>> Skype: jlilley.redpoint *|* john.lil...@redpoint.net *|* www.redpoint.net
>>
>>
>>
>> *From:* John Lilley [mailto:john.lil...@redpoint.net]
>> *Sent:* Friday, March 11, 2016 8:46 AM
>> *To:* Ovidiu-Cristian MARCU 
>> *Cc:* lihu ; Andrew A ;
>> u...@spark.incubator.apache.org; Geoff Thompson <
>> geoff.thomp...@redpoint.net>
>> *Subject:* RE: Graphx
>>
>>
>>
>> Ovidiu,
>>
>>
>>
>> IMHO, this is one of the biggest issues facing GraphX and Spark.  There
>> are a lot of knobs and levers to pull to affect performance, with very
>> little guidance about which settings work in general.  We cannot ship
>> software that requires end-user tuning; it just has to work.  Unfortunately
>> GraphX seems very sensitive to working set size relative to available RAM
>> and fails catastrophically as opposed to gracefully when working set is too
>> large.  It is also very sensitive to the nature of the data.  For example,
>> if we build a test file with input-edge representation like:
>>
>> 1 2
>>
>> 2 3
>>
>> 3 4
>>
>> 5 6
>>
>> 6 7
>>
>> 7 8
>>
>> …
>>
>> this represents a graph with connected components in groups of four.  We
>> found experimentally that when this data in input in clustered order, the
>> required memory is lower and runtime is much faster than when data is input
>> in random order.  This makes intuitive sense because of the additional
>> communication required for the random order.
>>
>>
>>
>> Our 1bn-edge test case was of this same form, input in clustered order,
>> with groups of 10 vertices per component.  It failed at 8 x 60GB.  This is
>> the kind of data that our application processes, so it is a realistic test
>> for us.  I’ve found that social media test data sets tend to follow
>> power-law distributions, and that GraphX has much less problem with them.
>>
>>
>>
>> A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges
>> in 10-vertex components using the synthetic test input I describe above.  I
>> would be curious to know if this works and what settings you use to
>> succeed, and if it continues to succeed for random input order.
>>
>>
>>
>> As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2)
>> behavior for large data sets, but it processes the 1bn-edge case on a
>> single 60GB node in about 20 minutes.  It degrades gracefully along the
>> O(N^2) curve and additional memory reduces time.
>>
>>
>>
>> *John Lilley*
>>
>>
>>
>> *From:* Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr
>> ]
>> *Sent:* Friday, March 11, 2016 8:14 AM
>> *To:* John Lilley 
>> *Cc:* lihu ; Andrew A ;
>> u...@spark.incubator.apache.org
>> *Subject:* Re: Graphx
>>
>>
>>
>> Hi,
>>
>>
>>
>> I wonder what version of Spark and different parameter configuration you
>> used.
>>
>> I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations)
>> using 16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
>>
>> John: I suppose your C++ app (algorithm) does not scale if you used only
>> one node.
>>
>> I don’t understand how RDD’s serialization is taking excessive time,
>> compared to the total time or other expected time?
>>
>>
>>
>> For the different RDD times you have events and UI console and a bunch of
>> papers describing how me

Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Jacek Laskowski
Hi,

How do you check which executor is used? Can you include a screenshot of
the master's webUI with workers?

Jacek
11.03.2016 6:57 PM "Darin McBeath"  napisał(a):

> I've run into a situation where it would appear that foreachPartition is
> only running on one of my executors.
>
> I have a small cluster (2 executors with 8 cores each).
>
> When I run a job with a small file (with 16 partitions) I can see that the
> 16 partitions are initialized but they all appear to be initialized on only
> one executor.  All of the work then runs on this  one executor (even though
> the number of partitions is 16). This seems odd, but at least it works.
> Not sure why the other executor was not used.
>
> However, when I run a larger file (once again with 16 partitions) I can
> see that the 16 partitions are initialized once again (but all on the same
> executor).  But, this time subsequent work is now spread across the 2
> executors.  This of course results in problems because the other executor
> was not initialized as all of the partitions were only initialized on the
> other executor.
>
> Does anyone have any suggestions for where I might want to investigate?
> Has anyone else seen something like this before?  Any thoughts/insights
> would be appreciated.  I'm using the Stand Alone Cluster manager, cluster
> started with the spark ec2 scripts  and submitting my job using
> spark-submit.
>
> Thanks.
>
> Darin.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Graphx

2016-03-11 Thread John Lilley
We have almost zero node info – just an identifying integer.
John Lilley

From: Alexis Roos [mailto:alexis.r...@gmail.com]
Sent: Friday, March 11, 2016 11:24 AM
To: Alexander Pivovarov 
Cc: John Lilley ; Ovidiu-Cristian MARCU 
; lihu ; Andrew A 
; u...@spark.incubator.apache.org; Geoff Thompson 

Subject: Re: Graphx

Also we keep the Node info minimal as needed for connected components and 
rejoin later.

Alexis

On Fri, Mar 11, 2016 at 10:12 AM, Alexander Pivovarov 
mailto:apivova...@gmail.com>> wrote:
we use it in prod

70 boxes, 61GB RAM each

GraphX Connected Components works fine on 250M Vertices and 1B Edges (takes 
about 5-10 min)

Spark likes memory, so use r3.2xlarge boxes (61GB)
For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge (30.5 
GB) (especially if you have skewed data)

Also, use checkpoints before and after Connected Components to reduce DAG delays

You can also try to enable Kryo and register classes used in RDD


On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
mailto:john.lil...@redpoint.net>> wrote:
I suppose for a 2.6bn case we’d need Long:

public class GenCCInput {
  public static void main(String[] args) {
if (args.length != 2) {
  System.err.println("Usage: \njava GenCCInput  ");
  System.exit(-1);
}
long edges = Long.parseLong(args[0]);
long groupSize = Long.parseLong(args[1]);
long currentEdge = 1;
long currentGroupSize = 0;
for (long i = 0; i < edges; i++) {
  System.out.println(currentEdge + " " + (currentEdge + 1));
  if (currentGroupSize == 0) {
currentGroupSize = 2;
  } else {
currentGroupSize++;
  }
  if (currentGroupSize >= groupSize) {
currentGroupSize = 0;
currentEdge += 2;
  } else {
currentEdge++;
  }
}
  }
}

John Lilley
Chief Architect, RedPoint Global Inc.
T: +1 303 541 1516  | M: +1 720 938 
5761 | F: +1 781-705-2077
Skype: jlilley.redpoint | 
john.lil...@redpoint.net | 
www.redpoint.net

From: John Lilley 
[mailto:john.lil...@redpoint.net]
Sent: Friday, March 11, 2016 8:46 AM
To: Ovidiu-Cristian MARCU 
mailto:ovidiu-cristian.ma...@inria.fr>>
Cc: lihu mailto:lihu...@gmail.com>>; Andrew A 
mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org; Geoff 
Thompson mailto:geoff.thomp...@redpoint.net>>
Subject: RE: Graphx

Ovidiu,

IMHO, this is one of the biggest issues facing GraphX and Spark.  There are a 
lot of knobs and levers to pull to affect performance, with very little 
guidance about which settings work in general.  We cannot ship software that 
requires end-user tuning; it just has to work.  Unfortunately GraphX seems very 
sensitive to working set size relative to available RAM and fails 
catastrophically as opposed to gracefully when working set is too large.  It is 
also very sensitive to the nature of the data.  For example, if we build a test 
file with input-edge representation like:
1 2
2 3
3 4
5 6
6 7
7 8
…
this represents a graph with connected components in groups of four.  We found 
experimentally that when this data in input in clustered order, the required 
memory is lower and runtime is much faster than when data is input in random 
order.  This makes intuitive sense because of the additional communication 
required for the random order.

Our 1bn-edge test case was of this same form, input in clustered order, with 
groups of 10 vertices per component.  It failed at 8 x 60GB.  This is the kind 
of data that our application processes, so it is a realistic test for us.  I’ve 
found that social media test data sets tend to follow power-law distributions, 
and that GraphX has much less problem with them.

A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges in 
10-vertex components using the synthetic test input I describe above.  I would 
be curious to know if this works and what settings you use to succeed, and if 
it continues to succeed for random input order.

As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2) behavior 
for large data sets, but it processes the 1bn-edge case on a single 60GB node 
in about 20 minutes.  It degrades gracefully along the O(N^2) curve and 
additional memory reduces time.

John Lilley

From: Ovidiu-Cristian MARCU [mailto:ovidiu-cristian.ma...@inria.fr]
Sent: Friday, March 11, 2016 8:14 AM
To: John Lilley mailto:john.lil...@redpoint.net>>
Cc: lihu mailto:lihu...@gmail.com>>; Andrew A 
mailto:andrew.a...@gmail.com>>; 
u...@spark.incubator.apache.org
Subject: Re: Graphx

Hi,

I wonder what version of Spark and different parameter configuration you used.
I was able to run CC for 1.8bn edges in about 8 minutes (23 iterations) using 
16 nodes with around 80GB RAM each (Spark 1.5, default parameters)
John: I suppose your C++ app (algorithm) does not scale if you used only one 
node.

Re: Python unit tests - Unable to ru it with Python 2.6 or 2.7

2016-03-11 Thread Holden Karau
So the run tests command allows you to specify the python version to test
again - maybe specify python2.7

On Friday, March 11, 2016, Gayathri Murali 
wrote:

> I do have 2.7 installed and unittest2 package available. I still see this
> error :
>
> Please install unittest2 to test with Python 2.6 or earlier
> Had test failures in pyspark.sql.tests with python2.6; see logs.
>
> Thanks
> Gayathri
>
>
>
> On Fri, Mar 11, 2016 at 10:07 AM, Davies Liu  > wrote:
>
>> Spark 2.0 is dropping the support for Python 2.6, it only work with
>> Python 2.7, and 3.4+
>>
>> On Thu, Mar 10, 2016 at 11:17 PM, Gayathri Murali
>> > > wrote:
>> > Hi all,
>> >
>> > I am trying to run python unit tests.
>> >
>> > I currently have Python 2.6 and 2.7 installed. I installed unittest2
>> against both of them.
>> >
>> > When I try to run /python/run-tests with Python 2.7 I get the following
>> error :
>> >
>> > Please install unittest2 to test with Python 2.6 or earlier
>> > Had test failures in pyspark.sql.tests with python2.6; see logs.
>> >
>> > When I try to run /python/run-tests with Python 2.6 I get the following
>> error:
>> >
>> > Traceback (most recent call last):
>> >   File "./python/run-tests.py", line 42, in 
>> > from sparktestsupport.modules import all_modules  # noqa
>> >   File
>> "/Users/gayathri/spark/python/../dev/sparktestsupport/modules.py", line 18,
>> in 
>> > from functools import total_ordering
>> > ImportError: cannot import name total_ordering
>> >
>> > total_ordering is a package that is available in 2.7.
>> >
>> > Can someone help?
>> >
>> > Thanks
>> > Gayathri
>> > -
>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> 
>> > For additional commands, e-mail: user-h...@spark.apache.org
>> 
>> >
>>
>
>

-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath


I can verify this by looking at the log file for the workers.

Since I output logging statements in the object called by the foreachPartition, 
I can see the statements being logged. Oddly, these output statements only 
occur in one executor (and not the other).  It occurs 16 times in this executor 
 since there are 16 partitions.  This seems odd as there are only 8 cores on 
the executor and the other executor doesn't appear to be called at all in the 
foreachPartition call.  But, when I go to do a map function on this same RDD 
then things start blowing up on the other executor as it starts doing work for 
some partitions (although, it would appear that all partitions were only 
initialized on the other executor). The executor that was used in the 
foreachPartition call works fine and doesn't experience issue.  But, because 
the other executor is failing on every request the job dies.

Darin.



From: Jacek Laskowski 
To: Darin McBeath  
Cc: user 
Sent: Friday, March 11, 2016 1:24 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor



Hi, 
How do you check which executor is used? Can you include a screenshot of the 
master's webUI with workers? 
Jacek 
11.03.2016 6:57 PM "Darin McBeath"  napisał(a):

I've run into a situation where it would appear that foreachPartition is only 
running on one of my executors.
>
>I have a small cluster (2 executors with 8 cores each).
>
>When I run a job with a small file (with 16 partitions) I can see that the 16 
>partitions are initialized but they all appear to be initialized on only one 
>executor.  All of the work then runs on this  one executor (even though the 
>number of partitions is 16). This seems odd, but at least it works.  Not sure 
>why the other executor was not used.
>
>However, when I run a larger file (once again with 16 partitions) I can see 
>that the 16 partitions are initialized once again (but all on the same 
>executor).  But, this time subsequent work is now spread across the 2 
>executors.  This of course results in problems because the other executor was 
>not initialized as all of the partitions were only initialized on the other 
>executor.
>
>Does anyone have any suggestions for where I might want to investigate?  Has 
>anyone else seen something like this before?  Any thoughts/insights would be 
>appreciated.  I'm using the Stand Alone Cluster manager, cluster started with 
>the spark ec2 scripts  and submitting my job using spark-submit.
>
>Thanks.
>
>Darin.
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Newbie question - Help with runtime error on augmentString

2016-03-11 Thread vasu20
Hi

Any help appreciated on this.  I am trying to write a Spark program using
IntelliJ.  I get a run time error as soon as new SparkConf() is called from
main.  Top few lines of the exception are pasted below.

These are the following versions:

Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
pom:  spark-core_2.11
 1.6.0

I have installed the Scala plugin in IntelliJ and added a dependency.

I have also added a library dependency in the project structure.

Thanks for any help!

Vasu


Exception in thread "main" java.lang.NoSuchMethodError:
scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
at org.apache.spark.util.Utils$.(Utils.scala:1682)
at org.apache.spark.util.Utils$.(Utils.scala)
at org.apache.spark.SparkConf.(SparkConf.scala:59)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Python unit tests - Unable to ru it with Python 2.6 or 2.7

2016-03-11 Thread Josh Rosen
AFAIK we haven't actually broken 2.6 compatibility yet for PySpark itself,
since Jenkins is still testing that configuration.

I think the problem that you're seeing is that dev/run-tests /
dev/run-tests-jenkins only work against Python 2.7+ right now. However,
./python/run-tests should be able to launch and run PySpark tests with
Python 2.6. Try ./python/run-tests --help for more details.

On Fri, Mar 11, 2016 at 10:31 AM Holden Karau  wrote:

> So the run tests command allows you to specify the python version to test
> again - maybe specify python2.7
>
> On Friday, March 11, 2016, Gayathri Murali 
> wrote:
>
>> I do have 2.7 installed and unittest2 package available. I still see this
>> error :
>>
>> Please install unittest2 to test with Python 2.6 or earlier
>> Had test failures in pyspark.sql.tests with python2.6; see logs.
>>
>> Thanks
>> Gayathri
>>
>>
>>
>> On Fri, Mar 11, 2016 at 10:07 AM, Davies Liu 
>> wrote:
>>
>>> Spark 2.0 is dropping the support for Python 2.6, it only work with
>>> Python 2.7, and 3.4+
>>>
>>> On Thu, Mar 10, 2016 at 11:17 PM, Gayathri Murali
>>>  wrote:
>>> > Hi all,
>>> >
>>> > I am trying to run python unit tests.
>>> >
>>> > I currently have Python 2.6 and 2.7 installed. I installed unittest2
>>> against both of them.
>>> >
>>> > When I try to run /python/run-tests with Python 2.7 I get the
>>> following error :
>>> >
>>> > Please install unittest2 to test with Python 2.6 or earlier
>>> > Had test failures in pyspark.sql.tests with python2.6; see logs.
>>> >
>>> > When I try to run /python/run-tests with Python 2.6 I get the
>>> following error:
>>> >
>>> > Traceback (most recent call last):
>>> >   File "./python/run-tests.py", line 42, in 
>>> > from sparktestsupport.modules import all_modules  # noqa
>>> >   File
>>> "/Users/gayathri/spark/python/../dev/sparktestsupport/modules.py", line 18,
>>> in 
>>> > from functools import total_ordering
>>> > ImportError: cannot import name total_ordering
>>> >
>>> > total_ordering is a package that is available in 2.7.
>>> >
>>> > Can someone help?
>>> >
>>> > Thanks
>>> > Gayathri
>>> > -
>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> > For additional commands, e-mail: user-h...@spark.apache.org
>>> >
>>>
>>
>>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
>
>


hive.metastore.metadb.dir not working programmatically

2016-03-11 Thread harirajaram
Experts need your help,
I'm using spark 1.4.1 and when set this hive.metastore.metadb.dir
programmatically for a hivecontext i.e for local metastore i.e the default
metastore_db for derby, the metastore_db is still getting creating in the
same path as user.dir.
Can you guys provide some insights regarding the same. Help much
appreciated.

p.s : I have also set hive.metastore.warehouse.dir.
Hari



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/hive-metastore-metadb-dir-not-working-programmatically-tp26463.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Newbie question - Help with runtime error on augmentString

2016-03-11 Thread Ted Yu
Looks like Scala version mismatch.

Are you using 2.11 everywhere ?

On Fri, Mar 11, 2016 at 10:33 AM, vasu20  wrote:

> Hi
>
> Any help appreciated on this.  I am trying to write a Spark program using
> IntelliJ.  I get a run time error as soon as new SparkConf() is called from
> main.  Top few lines of the exception are pasted below.
>
> These are the following versions:
>
> Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
> pom:  spark-core_2.11
>  1.6.0
>
> I have installed the Scala plugin in IntelliJ and added a dependency.
>
> I have also added a library dependency in the project structure.
>
> Thanks for any help!
>
> Vasu
>
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
> at org.apache.spark.util.Utils$.(Utils.scala:1682)
> at org.apache.spark.util.Utils$.(Utils.scala)
> at org.apache.spark.SparkConf.(SparkConf.scala:59)
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Jacek Laskowski
Hi,

Could you share the code with foreachPartition?

Jacek
11.03.2016 7:33 PM "Darin McBeath"  napisał(a):

>
>
> I can verify this by looking at the log file for the workers.
>
> Since I output logging statements in the object called by the
> foreachPartition, I can see the statements being logged. Oddly, these
> output statements only occur in one executor (and not the other).  It
> occurs 16 times in this executor  since there are 16 partitions.  This
> seems odd as there are only 8 cores on the executor and the other executor
> doesn't appear to be called at all in the foreachPartition call.  But, when
> I go to do a map function on this same RDD then things start blowing up on
> the other executor as it starts doing work for some partitions (although,
> it would appear that all partitions were only initialized on the other
> executor). The executor that was used in the foreachPartition call works
> fine and doesn't experience issue.  But, because the other executor is
> failing on every request the job dies.
>
> Darin.
>
>
> 
> From: Jacek Laskowski 
> To: Darin McBeath 
> Cc: user 
> Sent: Friday, March 11, 2016 1:24 PM
> Subject: Re: spark 1.6 foreachPartition only appears to be running on one
> executor
>
>
>
> Hi,
> How do you check which executor is used? Can you include a screenshot of
> the master's webUI with workers?
> Jacek
> 11.03.2016 6:57 PM "Darin McBeath" 
> napisał(a):
>
> I've run into a situation where it would appear that foreachPartition is
> only running on one of my executors.
> >
> >I have a small cluster (2 executors with 8 cores each).
> >
> >When I run a job with a small file (with 16 partitions) I can see that
> the 16 partitions are initialized but they all appear to be initialized on
> only one executor.  All of the work then runs on this  one executor (even
> though the number of partitions is 16). This seems odd, but at least it
> works.  Not sure why the other executor was not used.
> >
> >However, when I run a larger file (once again with 16 partitions) I can
> see that the 16 partitions are initialized once again (but all on the same
> executor).  But, this time subsequent work is now spread across the 2
> executors.  This of course results in problems because the other executor
> was not initialized as all of the partitions were only initialized on the
> other executor.
> >
> >Does anyone have any suggestions for where I might want to investigate?
> Has anyone else seen something like this before?  Any thoughts/insights
> would be appreciated.  I'm using the Stand Alone Cluster manager, cluster
> started with the spark ec2 scripts  and submitting my job using
> spark-submit.
> >
> >Thanks.
> >
> >Darin.
> >
> >-
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
> >
>


adding rows to a DataFrame

2016-03-11 Thread Stefan Panayotov
Hi,
 
I have a problem that requires me to go through the rows in a DataFrame (or 
possibly through rows in a JSON file) and conditionally add rows depending on a 
value in one of the columns in each existing row. So, for example if I have:
 
+---+---+---+

| _1| _2| _3|


+---+---+---+


|ID1|100|1.1|


|ID2|200|2.2|


|ID3|300|3.3|


|ID4|400|4.4|


+---+---+---+



I need to be able to get:
 
+---+---+---++---+

| _1| _2|
_3| 
_4| _5|


+---+---+---++---+


|ID1|100|1.1|ID1 add text or d...| 25|


|id11 ..|21 |


|id12 ..|22 |


|ID2|200|2.2|ID2 add text or d...| 50|


|id21 ..|33 |


|id22 ..|34 |


|id23 ..|35 |


|ID3|300|3.3|ID3 add text or d...| 75|


|id31 ..|11 |


|ID4|400|4.4|ID4 add text or d...|100|


|id41 ..|51 |


|id42 ..|52 |


|id43 ..|53 |


|id44 ..|54 |


+---+---+---++---+



How can I achieve this in Spark without doing DF.collect(), which will get 
everything to the driver and for a big data set I'll get OOM?
BTW, I know how to use withColumn() to add new columns to the DataFrame. I need 
to also add new rows.
Any help will be appreciated.

Thanks,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

Re: Newbie question - Help with runtime error on augmentString

2016-03-11 Thread Jacek Laskowski
Hi,

Why do you use maven not sbt for Scala?

Can you show the entire pom.xml and the command to execute the app?

Jacek
11.03.2016 7:33 PM "vasu20"  napisał(a):

> Hi
>
> Any help appreciated on this.  I am trying to write a Spark program using
> IntelliJ.  I get a run time error as soon as new SparkConf() is called from
> main.  Top few lines of the exception are pasted below.
>
> These are the following versions:
>
> Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
> pom:  spark-core_2.11
>  1.6.0
>
> I have installed the Scala plugin in IntelliJ and added a dependency.
>
> I have also added a library dependency in the project structure.
>
> Thanks for any help!
>
> Vasu
>
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
> at org.apache.spark.util.Utils$.(Utils.scala:1682)
> at org.apache.spark.util.Utils$.(Utils.scala)
> at org.apache.spark.SparkConf.(SparkConf.scala:59)
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Graphx

2016-03-11 Thread Khaled Ammar
This is an interesting discussion,

I have had some success running GraphX on large graphs with more than a
Billion edges using clusters of different size up to 64 machines. However,
the performance goes down when I double the cluster size to reach 128
machines of r3.xlarge. Does any one have experience with very large GraphX
clusters?

@Ovidiu-Cristian, @Alexis and @Alexander, could you please share the
configurations for Spark / GraphX that works best for you?

Thanks,
-Khaled

On Fri, Mar 11, 2016 at 1:25 PM, John Lilley 
wrote:

> We have almost zero node info – just an identifying integer.
>
> *John Lilley*
>
>
>
> *From:* Alexis Roos [mailto:alexis.r...@gmail.com]
> *Sent:* Friday, March 11, 2016 11:24 AM
> *To:* Alexander Pivovarov 
> *Cc:* John Lilley ; Ovidiu-Cristian MARCU <
> ovidiu-cristian.ma...@inria.fr>; lihu ; Andrew A <
> andrew.a...@gmail.com>; u...@spark.incubator.apache.org; Geoff Thompson <
> geoff.thomp...@redpoint.net>
> *Subject:* Re: Graphx
>
>
>
> Also we keep the Node info minimal as needed for connected components and
> rejoin later.
>
>
>
> Alexis
>
>
>
> On Fri, Mar 11, 2016 at 10:12 AM, Alexander Pivovarov <
> apivova...@gmail.com> wrote:
>
> we use it in prod
>
>
>
> 70 boxes, 61GB RAM each
>
>
>
> GraphX Connected Components works fine on 250M Vertices and 1B Edges
> (takes about 5-10 min)
>
>
>
> Spark likes memory, so use r3.2xlarge boxes (61GB)
>
> For example 10 x r3.2xlarge (61GB) work much faster than 20 x r3.xlarge
> (30.5 GB) (especially if you have skewed data)
>
>
>
> Also, use checkpoints before and after Connected Components to reduce DAG
> delays
>
>
>
> You can also try to enable Kryo and register classes used in RDD
>
>
>
>
>
> On Fri, Mar 11, 2016 at 8:07 AM, John Lilley 
> wrote:
>
> I suppose for a 2.6bn case we’d need Long:
>
>
>
> public class GenCCInput {
>
>   public static void main(String[] args) {
>
> if (args.length != 2) {
>
>   System.err.println("Usage: \njava GenCCInput  ");
>
>   System.exit(-1);
>
> }
>
> long edges = Long.parseLong(args[0]);
>
> long groupSize = Long.parseLong(args[1]);
>
> long currentEdge = 1;
>
> long currentGroupSize = 0;
>
> for (long i = 0; i < edges; i++) {
>
>   System.out.println(currentEdge + " " + (currentEdge + 1));
>
>   if (currentGroupSize == 0) {
>
> currentGroupSize = 2;
>
>   } else {
>
> currentGroupSize++;
>
>   }
>
>   if (currentGroupSize >= groupSize) {
>
> currentGroupSize = 0;
>
> currentEdge += 2;
>
>   } else {
>
> currentEdge++;
>
>   }
>
> }
>
>   }
>
> }
>
>
>
> *John Lilley*
>
> Chief Architect, RedPoint Global Inc.
>
> T: +1 303 541 1516  *| *M: +1 720 938 5761 *|* F: +1 781-705-2077
>
> Skype: jlilley.redpoint *|* john.lil...@redpoint.net *|* www.redpoint.net
>
>
>
> *From:* John Lilley [mailto:john.lil...@redpoint.net]
> *Sent:* Friday, March 11, 2016 8:46 AM
> *To:* Ovidiu-Cristian MARCU 
> *Cc:* lihu ; Andrew A ;
> u...@spark.incubator.apache.org; Geoff Thompson <
> geoff.thomp...@redpoint.net>
> *Subject:* RE: Graphx
>
>
>
> Ovidiu,
>
>
>
> IMHO, this is one of the biggest issues facing GraphX and Spark.  There
> are a lot of knobs and levers to pull to affect performance, with very
> little guidance about which settings work in general.  We cannot ship
> software that requires end-user tuning; it just has to work.  Unfortunately
> GraphX seems very sensitive to working set size relative to available RAM
> and fails catastrophically as opposed to gracefully when working set is too
> large.  It is also very sensitive to the nature of the data.  For example,
> if we build a test file with input-edge representation like:
>
> 1 2
>
> 2 3
>
> 3 4
>
> 5 6
>
> 6 7
>
> 7 8
>
> …
>
> this represents a graph with connected components in groups of four.  We
> found experimentally that when this data in input in clustered order, the
> required memory is lower and runtime is much faster than when data is input
> in random order.  This makes intuitive sense because of the additional
> communication required for the random order.
>
>
>
> Our 1bn-edge test case was of this same form, input in clustered order,
> with groups of 10 vertices per component.  It failed at 8 x 60GB.  This is
> the kind of data that our application processes, so it is a realistic test
> for us.  I’ve found that social media test data sets tend to follow
> power-law distributions, and that GraphX has much less problem with them.
>
>
>
> A comparable test scaled to your cluster (16 x 80GB) would be 2.6bn edges
> in 10-vertex components using the synthetic test input I describe above.  I
> would be curious to know if this works and what settings you use to
> succeed, and if it continues to succeed for random input order.
>
>
>
> As for the C++ algorithm, it scales multi-core.  It exhibits O(N^2)
> behavior for large data sets, but it processes the 1bn-edge case on a
> single 60GB node in about 20 minutes.  It degrades gracefu

Re: udf StructField to JSON String

2016-03-11 Thread Jacek Laskowski
Hi Tristan,

Mind sharing the relevant code? I'd like to learn the way you use
Transformer to do so. Thanks!

Jacek
11.03.2016 7:07 PM "Tristan Nixon"  napisał(a):

> I have a similar situation in an app of mine. I implemented a custom ML
> Transformer that wraps the Jackson ObjectMapper - this gives you full
> control over how your custom entities / structs are serialized.
>
> On Mar 11, 2016, at 11:53 AM, Caires Vinicius  wrote:
>
> Hmm. I think my problem is a little more complex. I'm using
> https://github.com/databricks/spark-redshift and when I read from JSON
> file I got this schema.
>
> root
>
> |-- app: string (nullable = true)
>
>  |-- ct: long (nullable = true)
>
>  |-- event: struct (nullable = true)
>
> ||-- attributes: struct (nullable = true)
>
>  |||-- account: string (nullable = true)
>
>  |||-- accountEmail: string (nullable = true)
>
>  |||-- accountId: string (nullable = true)
>
>
> I want to transform the Column *event* into String (formatted as JSON).
>
> I was trying to use udf but without success.
>
> On Fri, Mar 11, 2016 at 1:53 PM Tristan Nixon 
> wrote:
>
>> Have you looked at DataFrame.write.json( path )?
>>
>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>>
>> > On Mar 11, 2016, at 7:15 AM, Caires Vinicius 
>> wrote:
>> >
>> > I have one DataFrame with nested StructField and I want to convert to
>> JSON String. There is anyway to accomplish this?
>>
>>
>


Re: adding rows to a DataFrame

2016-03-11 Thread Jacek Laskowski
Just a guess...flatMap?

Jacek
11.03.2016 7:46 PM "Stefan Panayotov"  napisał(a):

> Hi,
>
> I have a problem that requires me to go through the rows in a DataFrame
> (or possibly through rows in a JSON file) and conditionally add rows
> depending on a value in one of the columns in each existing row. So, for
> example if I have:
>
>
> +---+---+---+
> | _1| _2| _3|
> +---+---+---+
> |ID1|100|1.1|
> |ID2|200|2.2|
> |ID3|300|3.3|
> |ID4|400|4.4|
> +---+---+---+
>
> I need to be able to get:
>
>
> +---+---+---++---+
> | _1| _2| _3|  _4| _5|
> +---+---+---++---+
> |ID1|100|1.1|ID1 add text or d...| 25|
> |id11 ..|21 |
> |id12 ..|22 |
> |ID2|200|2.2|ID2 add text or d...| 50|
> |id21 ..|33 |
> |id22 ..|34 |
> |id23 ..|35 |
> |ID3|300|3.3|ID3 add text or d...| 75|
> |id31 ..|11 |
> |ID4|400|4.4|ID4 add text or d...|100|
> |id41 ..|51 |
> |id42 ..|52 |
> |id43 ..|53 |
> |id44 ..|54 |
> +---+---+---++---+
>
> How can I achieve this in Spark without doing DF.collect(), which will get
> everything to the driver and for a big data set I'll get OOM?
> BTW, I know how to use withColumn() to add new columns to the DataFrame. I
> need to also add new rows.
> Any help will be appreciated.
>
> Thanks,
>
>
> *Stefan Panayotov, PhD **Home*: 610-355-0919
> *Cell*: 610-517-5586
> *email*: spanayo...@msn.com
> spanayo...@outlook.com
> spanayo...@comcast.net
>
>


Re: Newbie question - Help with runtime error on augmentString

2016-03-11 Thread Vasu Parameswaran
Thanks Ted.

I haven't explicitly specified Scala (I tried different versions in pom.xml
as well).

For what it is worth, this is what I get when I do a maven dependency
tree.  I wonder if the 2.11.2 coming from scala-reflect matters:


[INFO] |  | \- org.scala-lang:scalap:jar:2.11.0:compile
[INFO] |  |\- org.scala-lang:scala-compiler:jar:2.11.0:compile
[INFO] |  |   +-
org.scala-lang.modules:scala-xml_2.11:jar:1.0.1:compile
[INFO] |  |   \-
org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.1:compile
[INFO] |  +-
com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.4.4:compile
[INFO] |  |  +- org.scala-lang:scala-reflect:jar:2.11.2:compile
[INFO] \- org.scala-lang:scala-library:jar:2.11.0:compile



On Fri, Mar 11, 2016 at 10:38 AM, Ted Yu  wrote:

> Looks like Scala version mismatch.
>
> Are you using 2.11 everywhere ?
>
> On Fri, Mar 11, 2016 at 10:33 AM, vasu20  wrote:
>
>> Hi
>>
>> Any help appreciated on this.  I am trying to write a Spark program using
>> IntelliJ.  I get a run time error as soon as new SparkConf() is called
>> from
>> main.  Top few lines of the exception are pasted below.
>>
>> These are the following versions:
>>
>> Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
>> pom:  spark-core_2.11
>>  1.6.0
>>
>> I have installed the Scala plugin in IntelliJ and added a dependency.
>>
>> I have also added a library dependency in the project structure.
>>
>> Thanks for any help!
>>
>> Vasu
>>
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
>> at org.apache.spark.util.Utils$.(Utils.scala:1682)
>> at org.apache.spark.util.Utils$.(Utils.scala)
>> at org.apache.spark.SparkConf.(SparkConf.scala:59)
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath
My driver code has the following:

// Init S3 (workers) so we can read the assets
partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3));
// Get the assets.  Create a key pair where the key is asset id and the value 
is the rec.
JavaPairRDD seqFileRDD = partKeyFileRDD.mapToPair(new 
SimpleStorageServiceAsset());

The worker then has the following.  The issue I believe is that the following 
log.info statements only appear in the log file for one of my executors (and 
not both).  In other words, when executing the forEachPartition above, Spark 
appears to think all of the partitions are on one executor (at least that is 
the impression I'm left with).  But, when I get to the mapToToPair, Spark 
suddenly begins to use both executors.  I have verified that there are 16 
partitions for partKeyFileRDD.



public class SimpleStorageServiceInit implements VoidFunction> 
 {

privateString arg1;
private String arg2;
private String arg3;

public SimpleStorageServiceInit(arg1, String arg2, String arg3) {
this.arg1 = arg1;
this.arg2= arg2;
this.arg3 = arg3;
log.info("SimpleStorageServiceInit constructor");
log.info("SimpleStorageServiceInit constructor arg1: "+ arg1);
log.info("SimpleStorageServiceInit constructor arg2:"+ arg2);
log.info("SimpleStorageServiceInit constructor arg3: "+ arg3);
}

@Override
public void call(Iterator arg) throws Exception {
log.info("SimpleStorageServiceInit call");
log.info("SimpleStorageServiceInit call arg1: "+ arg1);
log.info("SimpleStorageServiceInit call arg2:"+ arg2);
log.info("SimpleStorageServiceInit call arg3: "+ arg3);
SimpleStorageService.init(this.arg1, this.arg2, this.arg3);
}
}


From: Jacek Laskowski 
To: Darin McBeath  
Cc: user 
Sent: Friday, March 11, 2016 1:40 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor



Hi, 
Could you share the code with foreachPartition? 
Jacek 
11.03.2016 7:33 PM "Darin McBeath"  napisał(a):


>
>I can verify this by looking at the log file for the workers.
>
>Since I output logging statements in the object called by the 
>foreachPartition, I can see the statements being logged. Oddly, these output 
>statements only occur in one executor (and not the other).  It occurs 16 times 
>in this executor  since there are 16 partitions.  This seems odd as there are 
>only 8 cores on the executor and the other executor doesn't appear to be 
>called at all in the foreachPartition call.  But, when I go to do a map 
>function on this same RDD then things start blowing up on the other executor 
>as it starts doing work for some partitions (although, it would appear that 
>all partitions were only initialized on the other executor). The executor that 
>was used in the foreachPartition call works fine and doesn't experience issue. 
> But, because the other executor is failing on every request the job dies.
>
>Darin.
>
>
>
>From: Jacek Laskowski 
>To: Darin McBeath 
>Cc: user 
>Sent: Friday, March 11, 2016 1:24 PM
>Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
>executor
>
>
>
>Hi,
>How do you check which executor is used? Can you include a screenshot of the 
>master's webUI with workers?
>Jacek
>11.03.2016 6:57 PM "Darin McBeath"  napisał(a):
>
>I've run into a situation where it would appear that foreachPartition is only 
>running on one of my executors.
>>
>>I have a small cluster (2 executors with 8 cores each).
>>
>>When I run a job with a small file (with 16 partitions) I can see that the 16 
>>partitions are initialized but they all appear to be initialized on only one 
>>executor.  All of the work then runs on this  one executor (even though the 
>>number of partitions is 16). This seems odd, but at least it works.  Not sure 
>>why the other executor was not used.
>>
>>However, when I run a larger file (once again with 16 partitions) I can see 
>>that the 16 partitions are initialized once again (but all on the same 
>>executor).  But, this time subsequent work is now spread across the 2 
>>executors.  This of course results in problems because the other executor was 
>>not initialized as all of the partitions were only initialized on the other 
>>executor.
>>
>>Does anyone have any suggestions for where I might want to investigate?  Has 
>>anyone else seen something like this before?  Any thoughts/insights would be 
>>appreciated.  I'm using the Stand Alone Cluster manager, cluster started with 
>>the spark ec2 scripts  and submitting my job using spark-submit.
>>
>>Thanks.
>>
>>Darin.
>>
>>-
>>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Newbie question - Help with runtime error on augmentString

2016-03-11 Thread Vasu Parameswaran
Thanks Jacek.  Pom is below (Currenlty set to 1.6.1 spark but I started out
with 1.6.0 with the same problem).



http://maven.apache.org/POM/4.0.0";
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>

spark
com.test
1.0-SNAPSHOT

4.0.0

sparktest


UTF-8




junit
junit



commons-cli
commons-cli


com.google.code.gson
gson
2.3.1
compile


org.apache.spark
spark-core_2.11
1.6.1






org.apache.maven.plugins
maven-shade-plugin
2.4.2


package

shade





${project.artifactId}-${project.version}-with-dependencies









On Fri, Mar 11, 2016 at 10:46 AM, Jacek Laskowski  wrote:

> Hi,
>
> Why do you use maven not sbt for Scala?
>
> Can you show the entire pom.xml and the command to execute the app?
>
> Jacek
> 11.03.2016 7:33 PM "vasu20"  napisał(a):
>
>> Hi
>>
>> Any help appreciated on this.  I am trying to write a Spark program using
>> IntelliJ.  I get a run time error as soon as new SparkConf() is called
>> from
>> main.  Top few lines of the exception are pasted below.
>>
>> These are the following versions:
>>
>> Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
>> pom:  spark-core_2.11
>>  1.6.0
>>
>> I have installed the Scala plugin in IntelliJ and added a dependency.
>>
>> I have also added a library dependency in the project structure.
>>
>> Thanks for any help!
>>
>> Vasu
>>
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
>> at org.apache.spark.util.Utils$.(Utils.scala:1682)
>> at org.apache.spark.util.Utils$.(Utils.scala)
>> at org.apache.spark.SparkConf.(SparkConf.scala:59)
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Spark property parameters priority

2016-03-11 Thread Cesar Flores
Right now I know of three different things to pass property parameters to
the Spark Context. They are:

   - A) Inside a SparkConf object just before creating the Spark Context
   - B) During job submission (i.e. --conf spark.driver.memory.memory = 2g)
   - C) By using a specific property file during job submission (i.e.
   --properties-file somefile.conf)

My question is: If you specify the same config parameter in more than one
place *what is the one that will be actually used?*

*Does the priority order is the same for any property or is property
dependent?*

*I am mostly interested in the config parameter
spark.sql.shuffle.partitions, which I need to modify on the fly to do group
by clauses depending on the size of my input.*


Thanks
-- 
Cesar Flores


How to distribute dependent files (.so , jar ) across spark worker nodes

2016-03-11 Thread prateek arora
Hi

I have multiple node cluster and my spark jobs depend on a native
library (.so files) and some jar files.

Can some one please explain what are the best ways to distribute dependent
files across nodes? 

right now i copied  dependent files in all nodes using chef tool .

Regards
Prateek



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-distribute-dependent-files-so-jar-across-spark-worker-nodes-tp26464.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark ML - Scaling logistic regression for many features

2016-03-11 Thread Daniel Siegmann
Thanks for the pointer to those indexers, those are some good examples. A
good way to go for the trainer and any scoring done in Spark. I will
definitely have to deal with scoring in non-Spark systems though.

I think I will need to scale up beyond what single-node liblinear can
practically provide. The system will need to handle much larger sub-samples
of this data (and other projects might be larger still). Additionally, the
system needs to train many models in parallel (hyper-parameter optimization
with n-fold cross-validation, multiple algorithms, different sets of
features).

Still, I suppose we'll have to consider whether Spark is the best system
for this. For now though, my job is to see what can be achieved with Spark.



On Fri, Mar 11, 2016 at 12:45 PM, Nick Pentreath 
wrote:

> Ok, I think I understand things better now.
>
> For Spark's current implementation, you would need to map those features
> as you mention. You could also use say StringIndexer -> OneHotEncoder or
> VectorIndexer. You could create a Pipeline to deal with the mapping and
> training (e.g.
> http://spark.apache.org/docs/latest/ml-guide.html#example-pipeline).
> Pipeline supports persistence.
>
> But it depends on your scoring use case too - a Spark pipeline can be
> saved and then reloaded, but you need all of Spark dependencies in your
> serving app which is often not ideal. If you're doing bulk scoring offline,
> then it may suit.
>
> Honestly though, for that data size I'd certainly go with something like
> Liblinear :) Spark will ultimately scale better with # training examples
> for very large scale problems. However there are definitely limitations on
> model dimension and sparse weight vectors currently. There are potential
> solutions to these but they haven't been implemented as yet.
>
> On Fri, 11 Mar 2016 at 18:35 Daniel Siegmann 
> wrote:
>
>> On Fri, Mar 11, 2016 at 5:29 AM, Nick Pentreath > > wrote:
>>
>>> Would you mind letting us know the # training examples in the datasets?
>>> Also, what do your features look like? Are they text, categorical etc? You
>>> mention that most rows only have a few features, and all rows together have
>>> a few 10,000s features, yet your max feature value is 20 million. How are
>>> your constructing your feature vectors to get a 20 million size? The only
>>> realistic way I can see this situation occurring in practice is with
>>> feature hashing (HashingTF).
>>>
>>
>> The sub-sample I'm currently training on is about 50K rows, so ... small.
>>
>> The features causing this issue are numeric (int) IDs for ... lets call
>> it "Thing". For each Thing in the record, we set the feature Thing.id to
>> a value of 1.0 in our vector (which is of course a SparseVector). I'm
>> not sure how IDs are generated for Things, but they can be large numbers.
>>
>> The largest Thing ID is around 20 million, so that ends up being the size
>> of the vector. But in fact there are fewer than 10,000 unique Thing IDs in
>> this data. The mean number of features per record in what I'm currently
>> training against is 41, while the maximum for any given record was 1754.
>>
>> It is possible to map the features into a small set (just need to
>> zipWithIndex), but this is undesirable because of the added complexity (not
>> just for the training, but also anything wanting to score against the
>> model). It might be a little easier if this could be encapsulated within
>> the model object itself (perhaps via composition), though I'm not sure how
>> feasible that is.
>>
>> But I'd rather not bother with dimensionality reduction at all - since we
>> can train using liblinear in just a few minutes, it doesn't seem necessary.
>>
>>
>>>
>>> MultivariateOnlineSummarizer uses dense arrays, but it should be
>>> possible to enable sparse data. Though in theory, the result will tend to
>>> be dense anyway, unless you have very many entries in the input feature
>>> vector that never occur and are actually zero throughout the data set
>>> (which it seems is the case with your data?). So I doubt whether using
>>> sparse vectors for the summarizer would improve performance in general.
>>>
>>
>> Yes, that is exactly my case - the vast majority of entries in the input
>> feature vector will *never* occur. Presumably that means most of the
>> values in the aggregators' arrays will be zero.
>>
>>
>>>
>>> LR doesn't accept a sparse weight vector, as it uses dense vectors for
>>> coefficients and gradients currently. When using L1 regularization, it
>>> could support sparse weight vectors, but the current implementation doesn't
>>> do that yet.
>>>
>>
>> Good to know it is theoretically possible to implement. I'll have to give
>> it some thought. In the meantime I guess I'll experiment with coalescing
>> the data to minimize the communication overhead.
>>
>> Thanks again.
>>
>


Re: spark 1.6 foreachPartition only appears to be running on one executor

2016-03-11 Thread Darin McBeath
ok, some more information (and presumably a workaround).

when I initial read in my file, I use the following code.

JavaRDD keyFileRDD = sc.textFile(keyFile)

Looking at the UI, this file has 2 partitions (both on the same executor).

I then subsequently repartition this RDD (to 16)

partKeyFileRDD = keyFileRDD.repartition(16)

Looking again at the UI, this file has 16 partitions now (all on the same 
executor). When the forEachPartition runs, this then uses these 16 partitions 
(all on the same executor).  I think this is really the problem.  I'm not sure 
why the repartition didn't spread the partitions across both executors.

When the mapToPair subsequently runs below both executors are used and things 
start falling apart because none of the initialization logic was performed on 
the one executor.

However, if I modify the code above 

JavaRDD keyFileRDD = sc.textFile(keyFile,16)

Then initial keyFileRDD will be in 16 partitions spread across both executors.  
When I execute my forEachPartition directly on keyFileRDD (since there is no 
need to repartition), both executors will now be used (and initialized).

Anyway, don't know if this is my lack of understanding for how repartition 
should work or if this is a bug.  Thanks Jacek for starting to dig into this.

Darin.



- Original Message -
From: Darin McBeath 
To: Jacek Laskowski 
Cc: user 
Sent: Friday, March 11, 2016 1:57 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor

My driver code has the following:

// Init S3 (workers) so we can read the assets
partKeyFileRDD.foreachPartition(new SimpleStorageServiceInit(arg1, arg2, arg3));
// Get the assets.  Create a key pair where the key is asset id and the value 
is the rec.
JavaPairRDD seqFileRDD = partKeyFileRDD.mapToPair(new 
SimpleStorageServiceAsset());

The worker then has the following.  The issue I believe is that the following 
log.info statements only appear in the log file for one of my executors (and 
not both).  In other words, when executing the forEachPartition above, Spark 
appears to think all of the partitions are on one executor (at least that is 
the impression I'm left with).  But, when I get to the mapToToPair, Spark 
suddenly begins to use both executors.  I have verified that there are 16 
partitions for partKeyFileRDD.



public class SimpleStorageServiceInit implements VoidFunction> 
 {

privateString arg1;
private String arg2;
private String arg3;

public SimpleStorageServiceInit(arg1, String arg2, String arg3) {
this.arg1 = arg1;
this.arg2= arg2;
this.arg3 = arg3;
log.info("SimpleStorageServiceInit constructor");
log.info("SimpleStorageServiceInit constructor arg1: "+ arg1);
log.info("SimpleStorageServiceInit constructor arg2:"+ arg2);
log.info("SimpleStorageServiceInit constructor arg3: "+ arg3);
}

@Override
public void call(Iterator arg) throws Exception {
log.info("SimpleStorageServiceInit call");
log.info("SimpleStorageServiceInit call arg1: "+ arg1);
log.info("SimpleStorageServiceInit call arg2:"+ arg2);
log.info("SimpleStorageServiceInit call arg3: "+ arg3);
SimpleStorageService.init(this.arg1, this.arg2, this.arg3);
}
}


From: Jacek Laskowski 
To: Darin McBeath  
Cc: user 
Sent: Friday, March 11, 2016 1:40 PM
Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
executor



Hi, 
Could you share the code with foreachPartition? 
Jacek 
11.03.2016 7:33 PM "Darin McBeath"  napisał(a):


>
>I can verify this by looking at the log file for the workers.
>
>Since I output logging statements in the object called by the 
>foreachPartition, I can see the statements being logged. Oddly, these output 
>statements only occur in one executor (and not the other).  It occurs 16 times 
>in this executor  since there are 16 partitions.  This seems odd as there are 
>only 8 cores on the executor and the other executor doesn't appear to be 
>called at all in the foreachPartition call.  But, when I go to do a map 
>function on this same RDD then things start blowing up on the other executor 
>as it starts doing work for some partitions (although, it would appear that 
>all partitions were only initialized on the other executor). The executor that 
>was used in the foreachPartition call works fine and doesn't experience issue. 
> But, because the other executor is failing on every request the job dies.
>
>Darin.
>
>
>
>From: Jacek Laskowski 
>To: Darin McBeath 
>Cc: user 
>Sent: Friday, March 11, 2016 1:24 PM
>Subject: Re: spark 1.6 foreachPartition only appears to be running on one 
>executor
>
>
>
>Hi,
>How do you check which executor is used? Can you include a screenshot of the 
>master's webUI with workers?
>Jacek
>11.03.2016 6:57 PM "Darin McBeath"  napisał(a):
>
>I've run into a situation where it would appear that foreachPartition is only 
>running on one of my executors.
>>
>>I have a small cluster (2 executors with 8 cores each).
>>
>>When I run a job

Re: Python unit tests - Unable to ru it with Python 2.6 or 2.7

2016-03-11 Thread Gayathri Murali
Thanks Josh. I am able to run with Python 2.7 explicitly by specifying
--python-executables=python2.7. By default it checks only for Python2.6.

Thanks
Gayathri





On Fri, Mar 11, 2016 at 10:35 AM, Josh Rosen 
wrote:

> AFAIK we haven't actually broken 2.6 compatibility yet for PySpark itself,
> since Jenkins is still testing that configuration.
>
> I think the problem that you're seeing is that dev/run-tests /
> dev/run-tests-jenkins only work against Python 2.7+ right now. However,
> ./python/run-tests should be able to launch and run PySpark tests with
> Python 2.6. Try ./python/run-tests --help for more details.
>
> On Fri, Mar 11, 2016 at 10:31 AM Holden Karau 
> wrote:
>
>> So the run tests command allows you to specify the python version to test
>> again - maybe specify python2.7
>>
>> On Friday, March 11, 2016, Gayathri Murali 
>> wrote:
>>
>>> I do have 2.7 installed and unittest2 package available. I still see
>>> this error :
>>>
>>> Please install unittest2 to test with Python 2.6 or earlier
>>> Had test failures in pyspark.sql.tests with python2.6; see logs.
>>>
>>> Thanks
>>> Gayathri
>>>
>>>
>>>
>>> On Fri, Mar 11, 2016 at 10:07 AM, Davies Liu 
>>> wrote:
>>>
 Spark 2.0 is dropping the support for Python 2.6, it only work with
 Python 2.7, and 3.4+

 On Thu, Mar 10, 2016 at 11:17 PM, Gayathri Murali
  wrote:
 > Hi all,
 >
 > I am trying to run python unit tests.
 >
 > I currently have Python 2.6 and 2.7 installed. I installed unittest2
 against both of them.
 >
 > When I try to run /python/run-tests with Python 2.7 I get the
 following error :
 >
 > Please install unittest2 to test with Python 2.6 or earlier
 > Had test failures in pyspark.sql.tests with python2.6; see logs.
 >
 > When I try to run /python/run-tests with Python 2.6 I get the
 following error:
 >
 > Traceback (most recent call last):
 >   File "./python/run-tests.py", line 42, in 
 > from sparktestsupport.modules import all_modules  # noqa
 >   File
 "/Users/gayathri/spark/python/../dev/sparktestsupport/modules.py", line 18,
 in 
 > from functools import total_ordering
 > ImportError: cannot import name total_ordering
 >
 > total_ordering is a package that is available in 2.7.
 >
 > Can someone help?
 >
 > Thanks
 > Gayathri
 > -
 > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 > For additional commands, e-mail: user-h...@spark.apache.org
 >

>>>
>>>
>>
>> --
>> Cell : 425-233-8271
>> Twitter: https://twitter.com/holdenkarau
>>
>>


Re: udf StructField to JSON String

2016-03-11 Thread Michael Armbrust
df.select("event").toJSON

On Fri, Mar 11, 2016 at 9:53 AM, Caires Vinicius  wrote:

> Hmm. I think my problem is a little more complex. I'm using
> https://github.com/databricks/spark-redshift and when I read from JSON
> file I got this schema.
>
> root
>
> |-- app: string (nullable = true)
>
>  |-- ct: long (nullable = true)
>
>  |-- event: struct (nullable = true)
>
> ||-- attributes: struct (nullable = true)
>
>  |||-- account: string (nullable = true)
>
>  |||-- accountEmail: string (nullable = true)
>
>  |||-- accountId: string (nullable = true)
>
>
> I want to transform the Column *event* into String (formatted as JSON).
>
> I was trying to use udf but without success.
>
> On Fri, Mar 11, 2016 at 1:53 PM Tristan Nixon 
> wrote:
>
>> Have you looked at DataFrame.write.json( path )?
>>
>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>>
>> > On Mar 11, 2016, at 7:15 AM, Caires Vinicius 
>> wrote:
>> >
>> > I have one DataFrame with nested StructField and I want to convert to
>> JSON String. There is anyway to accomplish this?
>>
>>


Re: adding rows to a DataFrame

2016-03-11 Thread Michael Armbrust
Or look at explode on DataFrame

On Fri, Mar 11, 2016 at 10:45 AM, Stefan Panayotov 
wrote:

> Hi,
>
> I have a problem that requires me to go through the rows in a DataFrame
> (or possibly through rows in a JSON file) and conditionally add rows
> depending on a value in one of the columns in each existing row. So, for
> example if I have:
>
>
> +---+---+---+
> | _1| _2| _3|
> +---+---+---+
> |ID1|100|1.1|
> |ID2|200|2.2|
> |ID3|300|3.3|
> |ID4|400|4.4|
> +---+---+---+
>
> I need to be able to get:
>
>
> +---+---+---++---+
> | _1| _2| _3|  _4| _5|
> +---+---+---++---+
> |ID1|100|1.1|ID1 add text or d...| 25|
> |id11 ..|21 |
> |id12 ..|22 |
> |ID2|200|2.2|ID2 add text or d...| 50|
> |id21 ..|33 |
> |id22 ..|34 |
> |id23 ..|35 |
> |ID3|300|3.3|ID3 add text or d...| 75|
> |id31 ..|11 |
> |ID4|400|4.4|ID4 add text or d...|100|
> |id41 ..|51 |
> |id42 ..|52 |
> |id43 ..|53 |
> |id44 ..|54 |
> +---+---+---++---+
>
> How can I achieve this in Spark without doing DF.collect(), which will get
> everything to the driver and for a big data set I'll get OOM?
> BTW, I know how to use withColumn() to add new columns to the DataFrame. I
> need to also add new rows.
> Any help will be appreciated.
>
> Thanks,
>
>
> *Stefan Panayotov, PhD **Home*: 610-355-0919
> *Cell*: 610-517-5586
> *email*: spanayo...@msn.com
> spanayo...@outlook.com
> spanayo...@comcast.net
>
>


Re: udf StructField to JSON String

2016-03-11 Thread Caires Vinicius
I would like to see the code as well Tristan!

On Fri, Mar 11, 2016 at 1:53 PM Tristan Nixon  wrote:

> Have you looked at DataFrame.write.json( path )?
>
> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>
> > On Mar 11, 2016, at 7:15 AM, Caires Vinicius  wrote:
> >
> > I have one DataFrame with nested StructField and I want to convert to
> JSON String. There is anyway to accomplish this?
>
>


Re: Spark property parameters priority

2016-03-11 Thread Jacek Laskowski
Hi

It could also be conf/spark-defaults.conf.

Jacek
11.03.2016 8:07 PM "Cesar Flores"  napisał(a):

>
> Right now I know of three different things to pass property parameters to
> the Spark Context. They are:
>
>- A) Inside a SparkConf object just before creating the Spark Context
>- B) During job submission (i.e. --conf spark.driver.memory.memory =
>2g)
>- C) By using a specific property file during job submission (i.e.
>--properties-file somefile.conf)
>
> My question is: If you specify the same config parameter in more than one
> place *what is the one that will be actually used?*
>
> *Does the priority order is the same for any property or is property
> dependent?*
>
> *I am mostly interested in the config parameter
> spark.sql.shuffle.partitions, which I need to modify on the fly to do group
> by clauses depending on the size of my input.*
>
>
> Thanks
> --
> Cesar Flores
>


Re: Newbie question - Help with runtime error on augmentString

2016-03-11 Thread Jacek Laskowski
Hi,

Doh! My eyes are bleeding to go through XMLs... 😁

Where did you specify Scala version? Dunno how it's in maven.

p.s. I *strongly* recommend sbt.

Jacek
11.03.2016 8:04 PM "Vasu Parameswaran"  napisał(a):

> Thanks Jacek.  Pom is below (Currenlty set to 1.6.1 spark but I started
> out with 1.6.0 with the same problem).
>
>
> 
> http://maven.apache.org/POM/4.0.0";
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
> 
> spark
> com.test
> 1.0-SNAPSHOT
> 
> 4.0.0
>
> sparktest
>
> 
> UTF-8
> 
>
> 
> 
> junit
> junit
> 
>
> 
> commons-cli
> commons-cli
> 
> 
> com.google.code.gson
> gson
> 2.3.1
> compile
> 
> 
> org.apache.spark
> spark-core_2.11
> 1.6.1
> 
> 
>
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 2.4.2
> 
> 
> package
> 
> shade
> 
> 
> 
> 
>
> ${project.artifactId}-${project.version}-with-dependencies
> 
> 
> 
> 
>
> 
>
>
>
> On Fri, Mar 11, 2016 at 10:46 AM, Jacek Laskowski  wrote:
>
>> Hi,
>>
>> Why do you use maven not sbt for Scala?
>>
>> Can you show the entire pom.xml and the command to execute the app?
>>
>> Jacek
>> 11.03.2016 7:33 PM "vasu20"  napisał(a):
>>
>>> Hi
>>>
>>> Any help appreciated on this.  I am trying to write a Spark program using
>>> IntelliJ.  I get a run time error as soon as new SparkConf() is called
>>> from
>>> main.  Top few lines of the exception are pasted below.
>>>
>>> These are the following versions:
>>>
>>> Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
>>> pom:  spark-core_2.11
>>>  1.6.0
>>>
>>> I have installed the Scala plugin in IntelliJ and added a dependency.
>>>
>>> I have also added a library dependency in the project structure.
>>>
>>> Thanks for any help!
>>>
>>> Vasu
>>>
>>>
>>> Exception in thread "main" java.lang.NoSuchMethodError:
>>> scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
>>> at org.apache.spark.util.Utils$.(Utils.scala:1682)
>>> at org.apache.spark.util.Utils$.(Utils.scala)
>>> at org.apache.spark.SparkConf.(SparkConf.scala:59)
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: How to distribute dependent files (.so , jar ) across spark worker nodes

2016-03-11 Thread Jacek Laskowski
Hi,

For jars use spark-submit --jars. Dunno about so's. Could that work through
jars?

Jacek
11.03.2016 8:07 PM "prateek arora"  napisał(a):

> Hi
>
> I have multiple node cluster and my spark jobs depend on a native
> library (.so files) and some jar files.
>
> Can some one please explain what are the best ways to distribute dependent
> files across nodes?
>
> right now i copied  dependent files in all nodes using chef tool .
>
> Regards
> Prateek
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-distribute-dependent-files-so-jar-across-spark-worker-nodes-tp26464.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


sliding Top N window

2016-03-11 Thread Yakubovich, Alexey
Good day,

I have a following task: a stream of “page vies” coming to kafka topic. Each 
view contains list of product Ids from a visited page. The task: to have in 
“real time” Top N product.

I am interested in some solution that would require minimum intermediate writes 
… So  need to build a sliding window for top N product, where the product 
counters dynamically changes and window should present the TOP product for the 
specified period of time.

I believe there is no way to avoid maintaining all product counters counters in 
memory/storage.  But at least I would like to do all logic, all calculation on 
a fly, in memory, not spilling multiple RDD from memory to disk.

So I believe I see one way of doing it:
   Take, msg from kafka take and line up, all elementary action (increase by 1 
the counter for the product PID )
  Each action will be implemented as a call to HTable.increment()  // or 
easier, with incrementColumnValue()…
  After each increment I can apply my own operation “offer” would provide that 
only top N products with counters are kept in another Hbase table (also with 
atomic operations).
 But there is another stream of events: decreasing product counters when view 
expires the legth of sliding window….

So my question: does anybody know/have and can share the piece code/ know how: 
how to implement “sliding Top N window” better.
If nothing will be offered, I will share what I will do myself.

Thank you
Alexey

This message, including any attachments, is the property of Sears Holdings 
Corporation and/or one of its subsidiaries. It is confidential and may contain 
proprietary or legally privileged information. If you are not the intended 
recipient, please delete it without reading the contents. Thank you.


Re: adding rows to a DataFrame

2016-03-11 Thread Jan Štěrba
It very much depends on the logic that generates the new rows. Is it
per row (i.e. without context?) then you can just convert to RDD and
perform a map operation on each row.

JavaPairRDD> grouped =
dataFrame.javaRDD().groupBy( group by what you need, probably ID );

return grouped.mapValues(rowsIt -> {
List rows = Lists.newArrayList(rowsIt);
return new list of rows based on you logic.
});

convert back to DataFrame using flatMap and createDataFrame
--
Jan Sterba
https://twitter.com/honzasterba | http://flickr.com/honzasterba |
http://500px.com/honzasterba


On Fri, Mar 11, 2016 at 8:49 PM, Michael Armbrust
 wrote:
> Or look at explode on DataFrame
>
> On Fri, Mar 11, 2016 at 10:45 AM, Stefan Panayotov 
> wrote:
>>
>> Hi,
>>
>> I have a problem that requires me to go through the rows in a DataFrame
>> (or possibly through rows in a JSON file) and conditionally add rows
>> depending on a value in one of the columns in each existing row. So, for
>> example if I have:
>>
>>
>> +---+---+---+
>>
>> | _1| _2| _3|
>> +---+---+---+
>> |ID1|100|1.1|
>> |ID2|200|2.2|
>> |ID3|300|3.3|
>> |ID4|400|4.4|
>> +---+---+---+
>>
>> I need to be able to get:
>>
>>
>> +---+---+---++---+
>>
>> | _1| _2| _3|  _4| _5|
>> +---+---+---++---+
>> |ID1|100|1.1|ID1 add text or d...| 25|
>> |id11 ..|21 |
>> |id12 ..|22 |
>> |ID2|200|2.2|ID2 add text or d...| 50|
>> |id21 ..|33 |
>> |id22 ..|34 |
>> |id23 ..|35 |
>> |ID3|300|3.3|ID3 add text or d...| 75|
>> |id31 ..|11 |
>> |ID4|400|4.4|ID4 add text or d...|100|
>> |id41 ..|51 |
>> |id42 ..|52 |
>> |id43 ..|53 |
>> |id44 ..|54 |
>> +---+---+---++---+
>>
>> How can I achieve this in Spark without doing DF.collect(), which will get
>> everything to the driver and for a big data set I'll get OOM?
>> BTW, I know how to use withColumn() to add new columns to the DataFrame. I
>> need to also add new rows.
>> Any help will be appreciated.
>>
>> Thanks,
>>
>> Stefan Panayotov, PhD
>> Home: 610-355-0919
>> Cell: 610-517-5586
>> email: spanayo...@msn.com
>> spanayo...@outlook.com
>> spanayo...@comcast.net
>>
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: adding rows to a DataFrame

2016-03-11 Thread Bijay Pathak
Here is another way you can achieve that(in Python):
base_df.withColumn("column_name","column_expression_for_new_column")
# To add new row create the data frame containing the new row and do the
unionAll()
base_df.unionAll(new_df)

# Another approach convert to rdd add required fields and convert back to
Dataframe
def update_row(row):
"""Add extra column according to your logic"""
# Example
update_row = row + ("Text","number",)
return row

updated_row_rdd = base_df.map(lambda row: update_row(row))
# Convert back to rdd with giving the schema
updated_df = sql_context.createDataFrame(updated_row_rdd, schema)

# To add extra row create the new data frame with the new row and do the
unionAll
result_df = updated_df.unionAll(new_row_df)


Thanks,
Bijay

On Fri, Mar 11, 2016 at 11:49 AM, Michael Armbrust 
wrote:

> Or look at explode on DataFrame
>
> On Fri, Mar 11, 2016 at 10:45 AM, Stefan Panayotov 
> wrote:
>
>> Hi,
>>
>> I have a problem that requires me to go through the rows in a DataFrame
>> (or possibly through rows in a JSON file) and conditionally add rows
>> depending on a value in one of the columns in each existing row. So, for
>> example if I have:
>>
>>
>> +---+---+---+
>> | _1| _2| _3|
>> +---+---+---+
>> |ID1|100|1.1|
>> |ID2|200|2.2|
>> |ID3|300|3.3|
>> |ID4|400|4.4|
>> +---+---+---+
>>
>> I need to be able to get:
>>
>>
>> +---+---+---++---+
>> | _1| _2| _3|  _4| _5|
>> +---+---+---++---+
>> |ID1|100|1.1|ID1 add text or d...| 25|
>> |id11 ..|21 |
>> |id12 ..|22 |
>> |ID2|200|2.2|ID2 add text or d...| 50|
>> |id21 ..|33 |
>> |id22 ..|34 |
>> |id23 ..|35 |
>> |ID3|300|3.3|ID3 add text or d...| 75|
>> |id31 ..|11 |
>> |ID4|400|4.4|ID4 add text or d...|100|
>> |id41 ..|51 |
>> |id42 ..|52 |
>> |id43 ..|53 |
>> |id44 ..|54 |
>> +---+---+---++---+
>>
>> How can I achieve this in Spark without doing DF.collect(), which will
>> get everything to the driver and for a big data set I'll get OOM?
>> BTW, I know how to use withColumn() to add new columns to the DataFrame.
>> I need to also add new rows.
>> Any help will be appreciated.
>>
>> Thanks,
>>
>>
>> *Stefan Panayotov, PhD **Home*: 610-355-0919
>> *Cell*: 610-517-5586
>> *email*: spanayo...@msn.com
>> spanayo...@outlook.com
>> spanayo...@comcast.net
>>
>>
>
>


Re: udf StructField to JSON String

2016-03-11 Thread Tristan Nixon
It’s pretty simple, really:

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.spark.ml.UnaryTransformer
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{DataType, StringType}

/**
 * A SparkML Transformer that will transform an
 * entity of type T into a JSON-formatted string.
 * Created by Tristan Nixon  on 3/11/16.
 */
class JsonSerializationTransformer[T](override val uid: String)
  extends UnaryTransformer[T,String,JsonSerializationTransformer[T]]
{
 def this() = this(Identifiable.randomUID("JsonSerializationTransformer"))
 val mapper = new ObjectMapper
 // add additional mapper configuration code here, like this:
 // mapper.setAnnotationIntrospector(new JaxbAnnotationIntrospector)
 // or this:
  // mapper.getSerializationConfig.withFeatures( 
SerializationFeature.WRITE_DATES_AS_TIMESTAMPS )

 override protected def createTransformFunc: ( T ) => String =
  mapper.writeValueAsString

 override protected def outputDataType: DataType = new StringType
}
and you would use it like any other transformer:

val jsontrans = new 
JsonSerializationTransformer[Document].setInputCol("myEntityColumn")
 .setOutputCol("myOutputColumn")

val dfWithJson = jsontrans.transform( entityDF )

Note that this implementation is for Jackson 2.x. If you want to use Jackson 
1.x, it’s a bit trickier because the ObjectMapper class is not Serializable, 
and so you need to initialize it per-partition rather than having it just be a 
standard property.

> On Mar 11, 2016, at 12:49 PM, Jacek Laskowski  wrote:
> 
> Hi Tristan,
> 
> Mind sharing the relevant code? I'd like to learn the way you use Transformer 
> to do so. Thanks!
> 
> Jacek
> 
> 11.03.2016 7:07 PM "Tristan Nixon"  > napisał(a):
> I have a similar situation in an app of mine. I implemented a custom ML 
> Transformer that wraps the Jackson ObjectMapper - this gives you full control 
> over how your custom entities / structs are serialized.
> 
>> On Mar 11, 2016, at 11:53 AM, Caires Vinicius > > wrote:
>> 
>> Hmm. I think my problem is a little more complex. I'm using 
>> https://github.com/databricks/spark-redshift 
>>  and when I read from JSON 
>> file I got this schema.
>> 
>> root
>> |-- app: string (nullable = true)
>> 
>>  |-- ct: long (nullable = true)
>> 
>>  |-- event: struct (nullable = true)
>> 
>> ||-- attributes: struct (nullable = true)
>> 
>>  |||-- account: string (nullable = true)
>> 
>>  |||-- accountEmail: string (nullable = true)
>> 
>> 
>>  |||-- accountId: string (nullable = true)
>> 
>> 
>> 
>> I want to transform the Column event into String (formatted as JSON). 
>> 
>> I was trying to use udf but without success.
>> 
>> 
>> On Fri, Mar 11, 2016 at 1:53 PM Tristan Nixon > > wrote:
>> Have you looked at DataFrame.write.json( path )?
>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter
>>  
>> 
>> 
>> > On Mar 11, 2016, at 7:15 AM, Caires Vinicius > > > wrote:
>> >
>> > I have one DataFrame with nested StructField and I want to convert to JSON 
>> > String. There is anyway to accomplish this?
>> 
> 



YARN process with Spark

2016-03-11 Thread Mich Talebzadeh
Hi,

Can these be clarified please


   1. Can a YARN container use more than one core and if this is
   configurable?
   2. A YARN container is constraint to 8MB by
   " yarn.scheduler.maximum-allocation-mb". If a YARN container is a Spark
   process will that limit also include the memory Spark going to be using?

Thanks,

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


Re: YARN process with Spark

2016-03-11 Thread Koert Kuipers
you get a spark executor per yarn container. the spark executor can have
multiple cores, yes. this is configurable. so the number of partitions that
can be processed in parallel is num-executors * executor-cores. and for
processing a partition the available memory is executor-memory /
executor-cores (roughly, cores can of course borrow memory from each other
within executor).

the relevant setting for spark-submit are:
 --executor-memory
 --executor-cores
 --num-executors

On Fri, Mar 11, 2016 at 4:58 PM, Mich Talebzadeh 
wrote:

> Hi,
>
> Can these be clarified please
>
>
>1. Can a YARN container use more than one core and if this is
>configurable?
>2. A YARN container is constraint to 8MB by
>" yarn.scheduler.maximum-allocation-mb". If a YARN container is a Spark
>process will that limit also include the memory Spark going to be using?
>
> Thanks,
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>


Re: Newbie question - Help with runtime error on augmentString

2016-03-11 Thread Tristan Nixon
You must be relying on IntelliJ to compile your scala, because you haven’t set 
up any scala plugin to compile it from maven.
You should have something like this in your plugins:


 
  net.alchim31.maven
  scala-maven-plugin
  
   
scala-compile-first
process-resources

 compile

   
   
scala-test-compile
process-test-resources

 testCompile

   
  
 


PS - I use maven to compile all my scala and haven’t had a problem with it. I 
know that sbt has some wonderful things, but I’m just set in my ways ;)

> On Mar 11, 2016, at 2:02 PM, Jacek Laskowski  wrote:
> 
> Hi,
> 
> Doh! My eyes are bleeding to go through XMLs... 😁
> 
> Where did you specify Scala version? Dunno how it's in maven.
> 
> p.s. I *strongly* recommend sbt.
> 
> Jacek
> 
> 11.03.2016 8:04 PM "Vasu Parameswaran"  > napisał(a):
> Thanks Jacek.  Pom is below (Currenlty set to 1.6.1 spark but I started out 
> with 1.6.0 with the same problem).
> 
> 
> 
> http://maven.apache.org/POM/4.0.0 
> "
>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance 
> "
>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
>  
> http://maven.apache.org/xsd/maven-4.0.0.xsd 
> ">
> 
> spark
> com.test
> 1.0-SNAPSHOT
> 
> 4.0.0
> 
> sparktest
> 
> 
> UTF-8
> 
> 
> 
> 
> junit
> junit
> 
> 
> 
> commons-cli
> commons-cli
> 
> 
> com.google.code.gson
> gson
> 2.3.1
> compile
> 
> 
> org.apache.spark
> spark-core_2.11
> 1.6.1
> 
> 
> 
> 
> 
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 2.4.2
> 
> 
> package
> 
> shade
> 
> 
> 
> 
> 
> ${project.artifactId}-${project.version}-with-dependencies
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Fri, Mar 11, 2016 at 10:46 AM, Jacek Laskowski  > wrote:
> Hi,
> 
> Why do you use maven not sbt for Scala?
> 
> Can you show the entire pom.xml and the command to execute the app?
> 
> Jacek
> 
> 11.03.2016 7:33 PM "vasu20" mailto:vas...@gmail.com>> 
> napisał(a):
> Hi
> 
> Any help appreciated on this.  I am trying to write a Spark program using
> IntelliJ.  I get a run time error as soon as new SparkConf() is called from
> main.  Top few lines of the exception are pasted below.
> 
> These are the following versions:
> 
> Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
> pom:  spark-core_2.11
>  1.6.0
> 
> I have installed the Scala plugin in IntelliJ and added a dependency.
> 
> I have also added a library dependency in the project structure.
> 
> Thanks for any help!
> 
> Vasu
> 
> 
> Exception in thread "main" java.lang.NoSuchMethodError:
> scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
> at org.apache.spark.util.Utils$.(Utils.scala:1682)
> at org.apache.spark.util.Utils$.(Utils.scala)
> at org.apache.spark.SparkConf.(SparkConf.scala:59)
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 
> 



Re: How to distribute dependent files (.so , jar ) across spark worker nodes

2016-03-11 Thread Tristan Nixon
I recommend you package all your dependencies (jars, .so’s, etc.) into a single 
uber-jar and then submit that. It’s much more convenient than trying to manage 
including everything in the --jars arg of spark-submit. If you build with maven 
than the shade plugin will do this for you nicely:
https://maven.apache.org/plugins/maven-shade-plugin/

> On Mar 11, 2016, at 2:05 PM, Jacek Laskowski  wrote:
> 
> Hi,
> 
> For jars use spark-submit --jars. Dunno about so's. Could that work through 
> jars?
> 
> Jacek
> 
> 11.03.2016 8:07 PM "prateek arora"  > napisał(a):
> Hi
> 
> I have multiple node cluster and my spark jobs depend on a native
> library (.so files) and some jar files.
> 
> Can some one please explain what are the best ways to distribute dependent
> files across nodes?
> 
> right now i copied  dependent files in all nodes using chef tool .
> 
> Regards
> Prateek
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-distribute-dependent-files-so-jar-across-spark-worker-nodes-tp26464.html
>  
> 
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org 
> 
> For additional commands, e-mail: user-h...@spark.apache.org 
> 
> 



Re: YARN process with Spark

2016-03-11 Thread Alexander Pivovarov
YARN cores are virtual cores which are used just to calculate available
resources. But usually memory is used to manage yarn resources (not cores)

spark executor memory should be ~90% of  yarn.scheduler.maximum-allocation-mb
(which should be the same as yarn.nodemanager.resource.memory-mb)
~10% should go to executor memory overhead

e.g. for r3.2xlarge slave (61GB / 8 cores) the setting are the following
(on EMR-4.2.0)

yarn settings:
yarn.nodemanager.resource.memory-mb = 54272
yarn.scheduler.maximum-allocation-mb = 54272

spark settings:
spark.executor.memory = 47924M
spark.yarn.executor.memoryOverhead = 5324
spark.executor.cores = 8// cores available on each slave

1024M of YARN memory is reserved on each box to run Spark AM container(s) -
Spark AM container uses 896 MB of yarn memory (AM used in both client and
cluster mode)



On Fri, Mar 11, 2016 at 2:08 PM, Koert Kuipers  wrote:

> you get a spark executor per yarn container. the spark executor can have
> multiple cores, yes. this is configurable. so the number of partitions that
> can be processed in parallel is num-executors * executor-cores. and for
> processing a partition the available memory is executor-memory /
> executor-cores (roughly, cores can of course borrow memory from each other
> within executor).
>
> the relevant setting for spark-submit are:
>  --executor-memory
>  --executor-cores
>  --num-executors
>
> On Fri, Mar 11, 2016 at 4:58 PM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Hi,
>>
>> Can these be clarified please
>>
>>
>>1. Can a YARN container use more than one core and if this is
>>configurable?
>>2. A YARN container is constraint to 8MB by
>>" yarn.scheduler.maximum-allocation-mb". If a YARN container is a Spark
>>process will that limit also include the memory Spark going to be using?
>>
>> Thanks,
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>
>


Re: YARN process with Spark

2016-03-11 Thread Alexander Pivovarov
Forgot to mention. To avoid unnecessary container termination add the
following setting to yarn

yarn.nodemanager.vmem-check-enabled = false


Re: YARN process with Spark

2016-03-11 Thread Mich Talebzadeh
Thanks Koert and Alexander

I think the yarn configuration parameters in yarn-site,xml are important.
For those I have



  yarn.nodemanager.resource.memory-mb
  Amount of max physical memory, in MB, that can be allocated
for YARN containers.
  8192


   yarn.nodemanager.vmem-pmem-ratio
Ratio between virtual memory to physical memory when
setting memory limits for containers
2.1
  

yarn.scheduler.maximum-allocation-mb
Maximum memory for each container
8192
  

yarn.scheduler.minimum-allocation-mb
Minimum memory for each container
2048
  

However, I noticed that you Alexander have the following settings

yarn.nodemanager.resource.memory-mb = 54272
yarn.scheduler.maximum-allocation-mb = 54272

With 8 Spark executor cores that gives you 6GB of memory per core. As a
matter of interest how much memory and how many cores do you have for each
node?

Thanks


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 11 March 2016 at 23:01, Alexander Pivovarov  wrote:

> Forgot to mention. To avoid unnecessary container termination add the
> following setting to yarn
>
> yarn.nodemanager.vmem-check-enabled = false
>
>


Repeating Records w/ Spark + Avro?

2016-03-11 Thread Chris Miller
I have a bit of a strange situation:

*
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper, AvroKey}
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.{NullWritable, WritableUtils}

val path = "/path/to/data.avro"

val rdd = sc.newAPIHadoopFile(path,
classOf[AvroKeyInputFormat[GenericRecord]],
classOf[AvroKey[GenericRecord]], classOf[NullWritable])
rdd.take(10).foreach( x => println( x._1.datum() ))
*

In this situation, I get the right number of records returned, and if I
look at the contents of rdd I see the individual records as tuple2's...
however, if I println on each one as shown above, I get the same result
every time.

Apparently this has to do with something in Spark or Avro keeping a
reference to the item its iterating over, so I need to clone the object
before I use it. However, if I try to clone it (from the spark-shell
console), I get:

*
rdd.take(10).foreach( x => {
  val clonedDatum = x._1.datum().clone()
  println(clonedDatum.datum())
})

:37: error: method clone in class Object cannot be accessed in
org.apache.avro.generic.GenericRecord
 Access to protected method clone not permitted because
 prefix type org.apache.avro.generic.GenericRecord does not conform to
 class $iwC where the access take place
val clonedDatum = x._1.datum().clone()
*

So, how can I clone the datum?

Seems I'm not the only one who ran into this problem:
https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/102. I can't
figure out how to fix it in my case without hacking away like the person in
the linked PR did.

Suggestions?

--
Chris Miller


Re: YARN process with Spark

2016-03-11 Thread Alexander Pivovarov
you need to set

yarn.scheduler.minimum-allocation-mb=32

otherwise Spark AM container will be running on dedicated box instead of
running together with the executor container on one of the boxes

for slaves I use Amazon EC2 r3.2xlarge box (61GB / 8 cores) - cost ~$0.10 /
hour (spot instance)



On Fri, Mar 11, 2016 at 3:17 PM, Mich Talebzadeh 
wrote:

> Thanks Koert and Alexander
>
> I think the yarn configuration parameters in yarn-site,xml are important.
> For those I have
>
>
> 
>   yarn.nodemanager.resource.memory-mb
>   Amount of max physical memory, in MB, that can be allocated
> for YARN containers.
>   8192
> 
> 
>yarn.nodemanager.vmem-pmem-ratio
> Ratio between virtual memory to physical memory when
> setting memory limits for containers
> 2.1
>   
> 
> yarn.scheduler.maximum-allocation-mb
> Maximum memory for each container
> 8192
>   
> 
> yarn.scheduler.minimum-allocation-mb
> Minimum memory for each container
> 2048
>   
>
> However, I noticed that you Alexander have the following settings
>
> yarn.nodemanager.resource.memory-mb = 54272
> yarn.scheduler.maximum-allocation-mb = 54272
>
> With 8 Spark executor cores that gives you 6GB of memory per core. As a
> matter of interest how much memory and how many cores do you have for each
> node?
>
> Thanks
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 11 March 2016 at 23:01, Alexander Pivovarov 
> wrote:
>
>> Forgot to mention. To avoid unnecessary container termination add the
>> following setting to yarn
>>
>> yarn.nodemanager.vmem-check-enabled = false
>>
>>
>


Re: udf StructField to JSON String

2016-03-11 Thread Tristan Nixon
So I think in your case you’d do something more like:

val jsontrans = new 
JsonSerializationTransformer[StructType].setInputCol(“event").setOutputCol(“eventJSON")


> On Mar 11, 2016, at 3:51 PM, Tristan Nixon  wrote:
> 
> val jsontrans = new 
> JsonSerializationTransformer[Document].setInputCol("myEntityColumn")
>  .setOutputCol("myOutputColumn")



Re: Newbie question - Help with runtime error on augmentString

2016-03-11 Thread Vasu Parameswaran
Added these to the pom and still the same error :-(. I will look into sbt
as well.



On Fri, Mar 11, 2016 at 2:31 PM, Tristan Nixon 
wrote:

> You must be relying on IntelliJ to compile your scala, because you haven’t
> set up any scala plugin to compile it from maven.
> You should have something like this in your plugins:
>
> 
>  
>   net.alchim31.maven
>   scala-maven-plugin
>   
>
> scala-compile-first
> process-resources
> 
>  compile
> 
>
>
> scala-test-compile
> process-test-resources
> 
>  testCompile
> 
>
>   
>  
> 
>
>
> PS - I use maven to compile all my scala and haven’t had a problem with
> it. I know that sbt has some wonderful things, but I’m just set in my ways
> ;)
>
> On Mar 11, 2016, at 2:02 PM, Jacek Laskowski  wrote:
>
> Hi,
>
> Doh! My eyes are bleeding to go through XMLs... 😁
>
> Where did you specify Scala version? Dunno how it's in maven.
>
> p.s. I *strongly* recommend sbt.
>
> Jacek
> 11.03.2016 8:04 PM "Vasu Parameswaran"  napisał(a):
>
>> Thanks Jacek.  Pom is below (Currenlty set to 1.6.1 spark but I started
>> out with 1.6.0 with the same problem).
>>
>>
>> 
>> http://maven.apache.org/POM/4.0.0";
>>  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>>  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>> 
>> spark
>> com.test
>> 1.0-SNAPSHOT
>> 
>> 4.0.0
>>
>> sparktest
>>
>> 
>> UTF-8
>> 
>>
>> 
>> 
>> junit
>> junit
>> 
>>
>> 
>> commons-cli
>> commons-cli
>> 
>> 
>> com.google.code.gson
>> gson
>> 2.3.1
>> compile
>> 
>> 
>> org.apache.spark
>> spark-core_2.11
>> 1.6.1
>> 
>> 
>>
>> 
>> 
>> 
>> org.apache.maven.plugins
>> maven-shade-plugin
>> 2.4.2
>> 
>> 
>> package
>> 
>> shade
>> 
>> 
>> 
>> 
>>
>> ${project.artifactId}-${project.version}-with-dependencies
>> 
>> 
>> 
>> 
>>
>> 
>>
>>
>>
>> On Fri, Mar 11, 2016 at 10:46 AM, Jacek Laskowski 
>> wrote:
>>
>>> Hi,
>>>
>>> Why do you use maven not sbt for Scala?
>>>
>>> Can you show the entire pom.xml and the command to execute the app?
>>>
>>> Jacek
>>> 11.03.2016 7:33 PM "vasu20"  napisał(a):
>>>
 Hi

 Any help appreciated on this.  I am trying to write a Spark program
 using
 IntelliJ.  I get a run time error as soon as new SparkConf() is called
 from
 main.  Top few lines of the exception are pasted below.

 These are the following versions:

 Spark jar:  spark-assembly-1.6.0-hadoop2.6.0.jar
 pom:  spark-core_2.11
  1.6.0

 I have installed the Scala plugin in IntelliJ and added a dependency.

 I have also added a library dependency in the project structure.

 Thanks for any help!

 Vasu


 Exception in thread "main" java.lang.NoSuchMethodError:
 scala.Predef$.augmentString(Ljava/lang/String;)Ljava/lang/String;
 at org.apache.spark.util.Utils$.(Utils.scala:1682)
 at org.apache.spark.util.Utils$.(Utils.scala)
 at org.apache.spark.SparkConf.(SparkConf.scala:59)






 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Newbie-question-Help-with-runtime-error-on-augmentString-tp26462.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com
 .

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>
>


  1   2   >