Re: Spark streaming and rate limit

2014-06-18 Thread Flavio Pompermaier
Yes, I need to call the external service for every event and the order does
not matter.
There's no time limit in which each events should be processed. I can't
tell the producer to slow down nor drop events.
Of course I could put a message broker in between like an AMQP or JMS
broker but I was thinking that maybe this issue was already addressed in
some way (of course there should be some buffer to process high rate
streaming)..or not?


On Thu, Jun 19, 2014 at 4:48 AM, Soumya Simanta 
wrote:

> Flavio - i'm new to Spark as well but I've done stream processing using
> other frameworks. My comments below are not spark-streaming specific. Maybe
> someone who know more can provide better insights.
>
> I read your post on my phone and I believe my answer doesn't completely
> address the issue you have raised.
>
> Do you need to call the external service for every event ? i.e., do you
> need to process all events ? Also does order of processing events matter?
> Is there is time bound in which each event should be processed ?
>
> Calling an external service means network IO. So you have to buffer events
> if your service is rate limited or slower than rate at which you are
> processing your event.
>
> Here are some ways of dealing with this situation:
>
> 1. Drop events based on a policy (such as buffer/queue size),
> 2. Tell the event producer to slow down if that's in your control
> 3. Use a proxy or a set of proxies to distribute the calls to the remote
> service, if the rate limit is by user or network node only.
>
> I'm not sure how many of these are implemented directly in Spark streaming
> but you can have an external component that can :
> control the rate of event and only send events to Spark streams when it's
> ready to process more messages.
>
> Hope this helps.
>
> -Soumya
>
>
>
>
> On Wed, Jun 18, 2014 at 6:50 PM, Flavio Pompermaier 
> wrote:
>
>> Thanks for the quick reply soumya. Unfortunately I'm a newbie with
>> Spark..what do you mean? is there any reference to how to do that?
>>
>>
>> On Thu, Jun 19, 2014 at 12:24 AM, Soumya Simanta <
>> soumya.sima...@gmail.com> wrote:
>>
>>>
>>> You can add a back pressured enabled component in front that feeds data
>>> into Spark. This component can control in input rate to spark.
>>>
>>> > On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier 
>>> wrote:
>>> >
>>> > Hi to all,
>>> > in my use case I'd like to receive events and call an external service
>>> as they pass through. Is it possible to limit the number of contemporaneous
>>> call to that service (to avoid DoS) using Spark streaming? if so, limiting
>>> the rate implies a possible buffer growth...how can I control the buffer of
>>> incoming events waiting to be processed?
>>> >
>>> > Best,
>>> > Flavio
>>>
>>


Re: Contribution to Spark MLLib

2014-06-18 Thread Xiangrui Meng
Denis, I think it is fine to have PLSA in MLlib. But I'm not familiar
with the modification you mentioned since the paper is new. We may
need to spend more time to learn the trade-offs. Feel free to create a
JIRA for PLSA and we can move our discussion there. It would be great
if you can share your current implementation. So it is easy for
developers to join the discussion.

Jayati, it is certainly NOT mandatory. But if you want to contribute
something new, please create a JIRA first.

Best,
Xiangrui


Re: Contribution to Spark MLLib

2014-06-18 Thread Jayati
Hello Xiangrui,

I am looking at the Spark Issues, but just wanted to know, if it is
mandatory for me to work on existing JIRAs before I can contribute to MLLib.

Regards,
Jayati





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716p7895.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


python worker crash in spark 1.0

2014-06-18 Thread Schein, Sagi
Hi,

I am trying to upgrade from spark v0.91 to v1.0.0 and getting into some wierd 
behavior.
When, in pyspark, I invoke 
sc.textFile("hdfs://hadoop-ha01:/user/x/events_2.1").take(1) the 
call crashes with the below stack trace.
The file resides in hadoop 2.2, it is a large event data, contain Unicode 
strings (unfortunately I cannot share the data due to privacy constraints).
Same code works just fine with the Scala version of 1.0.0
The annoying thing is that the same code on both Scala and python 0.9.1 work 
without any problem.

To me the problem seems to be as related to serialization support in python but 
I am just guessing.

Any help is appreciated,
Sagi


14/06/19 08:35:52 INFO deprecation: mapred.tip.id is deprecated. Instead, use ma
preduce.task.id
14/06/19 08:35:52 INFO deprecation: mapred.task.id is deprecated. Instead, use m
apreduce.task.attempt.id
14/06/19 08:35:52 INFO deprecation: mapred.task.is.map is deprecated. Instead, u
se mapreduce.task.ismap
14/06/19 08:35:52 INFO deprecation: mapred.task.partition is deprecated. Instead
, use mapreduce.task.partition
14/06/19 08:35:52 INFO deprecation: mapred.job.id is deprecated. Instead, use ma
preduce.job.id
14/06/19 08:35:53 INFO PythonRDD: Times: total = 542, boot = 219, init = 322, fi
nish = 1
14/06/19 08:35:53 ERROR PythonRDD: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset by peer: socket write error
   at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82
)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:332)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
2.apply(PythonRDD.scala:304)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
2.apply(PythonRDD.scala:303)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD
D.scala:303)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly$mcV$sp(PythonRDD.scala:200)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly(PythonRDD.scala:175)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly(PythonRDD.scala:175)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal
a:174)
14/06/19 08:35:53 ERROR PythonRDD: This may have been caused by a prior exceptio
n:
java.net.SocketException: Connection reset by peer: socket write error
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82
)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:332)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
2.apply(PythonRDD.scala:304)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$
2.apply(PythonRDD.scala:303)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRD
D.scala:303)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly$mcV$sp(PythonRDD.scala:200)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly(PythonRDD.scala:175)
at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.app
ly(PythonRDD.scala:175)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scal
a:174)
14/06/19 08:35:53 INFO DAGScheduler: Failed to run take at :1
Traceback (most recent call last):
  File "", line 1, in 
  File "D:\src\spark-1.0.0-bin-hadoop2\python\pyspark\rdd.py", line 868, in take

iterator = mapped._jrdd.collectPartitions(partitionsToTake)[0].iterator()
  File "D:\src\spark-1.

Re: Best practices for removing lineage of a RDD or Graph object?

2014-06-18 Thread dash
Hi Roy, 

Thanks for your help, I write a small code snippet that could reproduce the 
problem.
Could you help me read through it and see if I did anything wrong?

Thanks!

  def main(args: Array[String]) {
val conf = new SparkConf().setAppName(“TEST")
  .setMaster("local[4]")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.registrator", "edu.nd.dsg.hdtm.util.HDTMKryoRegistrator")
val sc = new SparkContext(conf)

val v = sc.parallelize(Seq[(VertexId, Long)]((0L, 0L), (1L, 1L), (2L, 2L)))
val e = sc.parallelize(Seq[Edge[Long]](Edge(0L, 1L, 0L), Edge(1L, 2L, 1L), 
Edge(2L, 0L, 2L)))
val newGraph = Graph(v, e)
var currentGraph = newGraph
val vertexIds = currentGraph.vertices.map(_._1).collect()

for (i <- 1 to 1000) {
  var g = currentGraph
  vertexIds.toStream.foreach(id => {
g = Graph(currentGraph.vertices, currentGraph.edges)
g.cache()
g.edges.cache()
g.vertices.cache()
g.vertices.count()
g.edges.count()
  })

  currentGraph.unpersistVertices(blocking =  false)
  currentGraph.edges.unpersist(blocking = false)
  currentGraph = g
  println(" iter "+i+" finished")
}

  }


Baoxu Shi(Dash)
Computer Science and Engineering Department
University of Notre Dame
b...@nd.edu



> On Jun 19, 2014, at 1:47 AM, roy20021 [via Apache Spark User List] 
>  wrote:
> 
> No sure if it can help, btw:
> Checkpoint cuts the lineage. The checkpoint method is a flag. In order to 
> actually perform the checkpoint you must do NOT materialise the RDD before it 
> has been flagged otherwise the flag is just ignored.
> 
> rdd2 = rdd1.map(..)
> rdd2.checkpoint()
> rdd2.count
> rdd2.isCheckpointed // true
> 
> Il mercoledì 18 giugno 2014, dash <[hidden email]> ha scritto:
> > If a RDD object have non-empty .dependencies, does that means it have
> > lineage? How could I remove it?
> >
> > I'm doing iterative computing and each iteration depends on the result
> > computed in previous iteration. After several iteration, it will throw
> > StackOverflowError.
> >
> > At first I'm trying to use cache, I read the code in pregel.scala, which is
> > part of GraphX, they use a count method to materialize the object after
> > cache, but I attached a debugger and seems such approach does not empty
> > .dependencies, and that also does not work in my code.
> >
> > Another alternative approach is using checkpoint, I tried checkpoint
> > vertices and edges for my Graph object and then materialize it by count
> > vertices and edges. Then I use .isCheckpointed to check if it is correctly
> > checkpointed, but it always return false.
> >
> >
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> > 
> 
> If you reply to this email, your message will be added to the discussion 
> below:
> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779p7892.html
> To unsubscribe from Best practices for removing lineage of a RDD or Graph 
> object?, click here.
> NAML





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779p7893.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Best practices for removing lineage of a RDD or Graph object?

2014-06-18 Thread Andrea Esposito
No sure if it can help, btw:
Checkpoint cuts the lineage. The checkpoint method is a flag. In order to
actually perform the checkpoint you must do NOT materialise the RDD before
it has been flagged otherwise the flag is just ignored.

rdd2 = rdd1.map(..)
rdd2.checkpoint()
rdd2.count
rdd2.isCheckpointed // true

Il mercoledì 18 giugno 2014, dash  ha scritto:
> If a RDD object have non-empty .dependencies, does that means it have
> lineage? How could I remove it?
>
> I'm doing iterative computing and each iteration depends on the result
> computed in previous iteration. After several iteration, it will throw
> StackOverflowError.
>
> At first I'm trying to use cache, I read the code in pregel.scala, which
is
> part of GraphX, they use a count method to materialize the object after
> cache, but I attached a debugger and seems such approach does not empty
> .dependencies, and that also does not work in my code.
>
> Another alternative approach is using checkpoint, I tried checkpoint
> vertices and edges for my Graph object and then materialize it by count
> vertices and edges. Then I use .isCheckpointed to check if it is correctly
> checkpointed, but it always return false.
>
>
>
> --
> View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-removing-lineage-of-a-RDD-or-Graph-object-tp7779.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Un-serializable 3rd-party classes (Spark, Java)

2014-06-18 Thread Daedalus
Kryo did the job.
Thanks!


On Wed, Jun 18, 2014 at 10:44 AM, Matei Zaharia [via Apache Spark User
List]  wrote:

> There are a few options:
>
> - Kryo might be able to serialize these objects out of the box, depending
> what’s inside them. Try turning it on as described at
> http://spark.apache.org/docs/latest/tuning.html.
>
> - If that doesn’t work, you can create your own “wrapper” objects that
> implement Serializable, or even a subclass of FlexCompRowMatrix. No need to
> change the original library.
>
> - If the library has its own serialization functions, you could also use
> those inside a wrapper object. Take a look at
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SerializableWritable.scala
>  for
> an example where we make Hadoop’s Writables serializable.
>
> Matei
>
> On Jun 17, 2014, at 10:11 PM, Daedalus <[hidden email]
> > wrote:
>
> > I'm trying to use  matrix-toolkit-java
> >    for an application
> of
> > mine, particularly ,the FlexCompRowMatrix class (used to store sparse
> > matrices).
> >
> > I have a class Dataframe -- which contains and int array, two double
> values,
> > and one FlexCompRowMatrix.
> >
> > When I try and run a simple Spark .foreach() on a JavaRDD created using
> a
> > list of the above mentioned Dataframes, I get the following errors:
> >
> > Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due
> > to s
> > tage failure:* Task not serializable: java.io.NotSerializableException:
> > no.uib.ci
> > pr.matrix.sparse.FlexCompRowMatrix*
> >at
> > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DA
> > GScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
> >
> > The FlexCompRowMatrix doesn't seem to implement Serializable. This class
> > suits my purpose very well, and I would prefer not to switch over from
> it.
> >
> > Other than writing code to make the class serializable, and then
> recompiling
> > the matrix-toolkit-java source, what options do I have?
> >
> > Is there any workaround for this issue?
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Un-serializable-3rd-party-classes-Spark-Java-tp7815.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>
> --
>  If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Un-serializable-3rd-party-classes-Spark-Java-tp7815p7816.html
>  To unsubscribe from Un-serializable 3rd-party classes (Spark, Java), click
> here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Un-serializable-3rd-party-classes-Spark-Java-tp7815p7891.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

options set in spark-env.sh is not reflecting on actual execution

2014-06-18 Thread MEETHU MATHEW
Hi all,

I have a doubt regarding the options in spark-env.sh. I set the following 
values in the file in master and 2 workers

SPARK_WORKER_MEMORY=7g
SPARK_EXECUTOR_MEMORY=6g
SPARK_DAEMON_JAVA_OPTS+="- Dspark.akka.timeout=30 
-Dspark.akka.frameSize=1 -Dspark.blockManagerHeartBeatMs=80 
-Dspark.shuffle.spill=false

But SPARK_EXECUTOR_MEMORY is showing 4g in web UI.Do I need to change it 
anywhere else to make it 4g and to reflect it in web UI.

A warning is coming that blockManagerHeartBeatMs is exceeding 45 while 
executing a process even though I set it to 80.

So I doubt whether it should be set  as SPARK_MASTER_OPTS or SPARK_WORKER_OPTS..
 
Thanks & Regards, 
Meethu M

Fwd: BSP realization on Spark

2014-06-18 Thread Ghousia
-- Forwarded message --
From: Ghousia 
Date: Wed, Jun 18, 2014 at 5:41 PM
Subject: BSP realization on Spark
To: user@spark.apache.org


Hi,

We are trying to implement a BSP model in Spark with the help of GraphX.
One thing I encountered is a Pregel operator in Graph class. But what I
fail to understand is how the Master and Worker needs to be assigned (BSP),
and how barrier synchronization would happen. The pregel operator provides
a way to define a vertex program, but nothing is mentioned about the
barrier synchronization.

Any help in this regard is truly appreciated.

Many Thanks,
Ghousia.


Re: Execution stalls in LogisticRegressionWithSGD

2014-06-18 Thread Bharath Ravi Kumar
Thanks. I'll await the fix to re-run my test.


On Thu, Jun 19, 2014 at 8:28 AM, Xiangrui Meng  wrote:

> Hi Bharath,
>
> This is related to SPARK-1112, which we already found the root cause.
> I will let you know when this is fixed.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar 
> wrote:
> > Couple more points:
> > 1)The inexplicable stalling of execution with large feature sets appears
> > similar to that reported with the news-20 dataset:
> >
> http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E
> >
> > 2) The NPE trying to call mapToPair convert an RDD > Integer> into a JavaPairRDD, Tuple2>
> is
> > unrelated to mllib.
> >
> > Thanks,
> > Bharath
> >
> >
> >
> > On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar  >
> > wrote:
> >>
> >> Hi  Xiangrui ,
> >>
> >> I'm using 1.0.0.
> >>
> >> Thanks,
> >> Bharath
> >>
> >> On 18-Jun-2014 1:43 am, "Xiangrui Meng"  wrote:
> >>>
> >>> Hi Bharath,
> >>>
> >>> Thanks for posting the details! Which Spark version are you using?
> >>>
> >>> Best,
> >>> Xiangrui
> >>>
> >>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar <
> reachb...@gmail.com>
> >>> wrote:
> >>> > Hi,
> >>> >
> >>> > (Apologies for the long mail, but it's necessary to provide
> sufficient
> >>> > details considering the number of issues faced.)
> >>> >
> >>> > I'm running into issues testing LogisticRegressionWithSGD a two node
> >>> > cluster
> >>> > (each node with 24 cores and 16G available to slaves out of 24G on
> the
> >>> > system). Here's a description of the application:
> >>> >
> >>> > The model is being trained based on categorical features x, y, and
> >>> > (x,y).
> >>> > The categorical features are mapped to binary features by converting
> >>> > each
> >>> > distinct value in the category enum into a binary feature by itself
> >>> > (i.e
> >>> > presence of that value in a record implies corresponding feature = 1,
> >>> > else
> >>> > feature = 0. So, there'd be as many distinct features as enum
> values) .
> >>> > The
> >>> > training vector is laid out as
> >>> > [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in
> the
> >>> > training data has only one combination (Xk,Yk) and a label appearing
> in
> >>> > the
> >>> > record. Thus, the corresponding labeledpoint sparse vector would only
> >>> > have 3
> >>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the
> vector
> >>> > (though parse) would be nearly 614000.  The number of records is
> about
> >>> > 1.33
> >>> > million. The records have been coalesced into 20 partitions across
> two
> >>> > nodes. The input data has not been cached.
> >>> > (NOTE: I do realize the records & features may seem large for a two
> >>> > node
> >>> > setup, but given the memory & cpu, and the fact that I'm willing to
> >>> > give up
> >>> > some turnaround time, I don't see why tasks should inexplicably fail)
> >>> >
> >>> > Additional parameters include:
> >>> >
> >>> > spark.executor.memory = 14G
> >>> > spark.default.parallelism = 1
> >>> > spark.cores.max=20
> >>> > spark.storage.memoryFraction=0.8 //No cache space required
> >>> > (Trying to set spark.akka.frameSize to a larger number, say, 20
> didn't
> >>> > help
> >>> > either)
> >>> >
> >>> > The model training was initialized as : new
> >>> > LogisticRegressionWithSGD(1,
> >>> > maxIterations, 0.0, 0.05)
> >>> >
> >>> > However, after 4 iterations of gradient descent, the entire execution
> >>> > appeared to stall inexplicably. The corresponding executor details
> and
> >>> > details of the stalled stage (number 14) are as follows:
> >>> >
> >>> > MetricMin25th Median75th
> >>> > Max
> >>> > Result serialization time12 ms13 ms14 ms16 ms18
> ms
> >>> > Duration4 s4 s5 s5 s
> >>> > 5 s
> >>> > Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
> >>> > results
> >>> > Scheduler delay6 s6 s6 s6 s
> >>> > 12 s
> >>> >
> >>> >
> >>> > Stage Id
> >>> > 14 aggregate at GradientDescent.scala:178
> >>> >
> >>> > Task IndexTask IDStatusLocality Level Executor
> >>> > Launch TimeDurationGC Result Ser Time
>  Errors
> >>> >
> >>> > Time
> >>> >
> >>> > 0 600 RUNNING PROCESS_LOCAL
> serious.dataone.foo.bar.com
> >>> > 2014/06/17 10:32:27 1.1 h
> >>> > 1 601 RUNNING PROCESS_LOCAL
> casual.dataone.foo.bar.com
> >>> > 2014/06/17 10:32:27 1.1 h
> >>> > 2 602 RUNNING PROCESS_LOCAL
> serious.dataone.foo.bar.com
> >>> > 2014/06/17 10:32:27 1.1 h
> >>> > 3 603 RUNNING PROCESS_LOCAL
> casual.dataone.foo.bar.com
> >>> > 2014/06/17 10:32:27 1.1 h
> >>> > 4 604 RUNNING PROCESS_LOCAL
> serious.dataone.foo.bar.com
> >>> > 2014/06/17 10:32:27 1.1 h
> >>> > 5 605 SUCCESS PROCESS_LOCAL
> casual.dataone.foo.bar.com
>

Re: Execution stalls in LogisticRegressionWithSGD

2014-06-18 Thread Xiangrui Meng
Hi Bharath,

This is related to SPARK-1112, which we already found the root cause.
I will let you know when this is fixed.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 7:37 PM, Bharath Ravi Kumar  wrote:
> Couple more points:
> 1)The inexplicable stalling of execution with large feature sets appears
> similar to that reported with the news-20 dataset:
> http://mail-archives.apache.org/mod_mbox/spark-user/201406.mbox/%3c53a03542.1010...@gmail.com%3E
>
> 2) The NPE trying to call mapToPair convert an RDD Integer> into a JavaPairRDD, Tuple2> is
> unrelated to mllib.
>
> Thanks,
> Bharath
>
>
>
> On Wed, Jun 18, 2014 at 7:14 AM, Bharath Ravi Kumar 
> wrote:
>>
>> Hi  Xiangrui ,
>>
>> I'm using 1.0.0.
>>
>> Thanks,
>> Bharath
>>
>> On 18-Jun-2014 1:43 am, "Xiangrui Meng"  wrote:
>>>
>>> Hi Bharath,
>>>
>>> Thanks for posting the details! Which Spark version are you using?
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Tue, Jun 17, 2014 at 6:48 AM, Bharath Ravi Kumar 
>>> wrote:
>>> > Hi,
>>> >
>>> > (Apologies for the long mail, but it's necessary to provide sufficient
>>> > details considering the number of issues faced.)
>>> >
>>> > I'm running into issues testing LogisticRegressionWithSGD a two node
>>> > cluster
>>> > (each node with 24 cores and 16G available to slaves out of 24G on the
>>> > system). Here's a description of the application:
>>> >
>>> > The model is being trained based on categorical features x, y, and
>>> > (x,y).
>>> > The categorical features are mapped to binary features by converting
>>> > each
>>> > distinct value in the category enum into a binary feature by itself
>>> > (i.e
>>> > presence of that value in a record implies corresponding feature = 1,
>>> > else
>>> > feature = 0. So, there'd be as many distinct features as enum values) .
>>> > The
>>> > training vector is laid out as
>>> > [x1,x2...xn,y1,y2yn,(x1,y1),(x2,y2)...(xn,yn)]. Each record in the
>>> > training data has only one combination (Xk,Yk) and a label appearing in
>>> > the
>>> > record. Thus, the corresponding labeledpoint sparse vector would only
>>> > have 3
>>> > values Xk, Yk, (Xk,Yk) set for a record. The total length of the vector
>>> > (though parse) would be nearly 614000.  The number of records is about
>>> > 1.33
>>> > million. The records have been coalesced into 20 partitions across two
>>> > nodes. The input data has not been cached.
>>> > (NOTE: I do realize the records & features may seem large for a two
>>> > node
>>> > setup, but given the memory & cpu, and the fact that I'm willing to
>>> > give up
>>> > some turnaround time, I don't see why tasks should inexplicably fail)
>>> >
>>> > Additional parameters include:
>>> >
>>> > spark.executor.memory = 14G
>>> > spark.default.parallelism = 1
>>> > spark.cores.max=20
>>> > spark.storage.memoryFraction=0.8 //No cache space required
>>> > (Trying to set spark.akka.frameSize to a larger number, say, 20 didn't
>>> > help
>>> > either)
>>> >
>>> > The model training was initialized as : new
>>> > LogisticRegressionWithSGD(1,
>>> > maxIterations, 0.0, 0.05)
>>> >
>>> > However, after 4 iterations of gradient descent, the entire execution
>>> > appeared to stall inexplicably. The corresponding executor details and
>>> > details of the stalled stage (number 14) are as follows:
>>> >
>>> > MetricMin25th Median75th
>>> > Max
>>> > Result serialization time12 ms13 ms14 ms16 ms18 ms
>>> > Duration4 s4 s5 s5 s
>>> > 5 s
>>> > Time spent fetching task 0 ms0 ms0 ms0 ms0 ms
>>> > results
>>> > Scheduler delay6 s6 s6 s6 s
>>> > 12 s
>>> >
>>> >
>>> > Stage Id
>>> > 14 aggregate at GradientDescent.scala:178
>>> >
>>> > Task IndexTask IDStatusLocality Level Executor
>>> > Launch TimeDurationGC Result Ser TimeErrors
>>> >
>>> > Time
>>> >
>>> > 0 600 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 1.1 h
>>> > 1 601 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 1.1 h
>>> > 2 602 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 1.1 h
>>> > 3 603 RUNNING PROCESS_LOCAL casual.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 1.1 h
>>> > 4 604 RUNNING PROCESS_LOCAL serious.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 1.1 h
>>> > 5 605 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 4 s 2 s 12 ms
>>> > 6 606 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 4 s 1 s 14 ms
>>> > 7 607 SUCCESS PROCESS_LOCAL casual.dataone.foo.bar.com
>>> > 2014/06/17 10:32:27 4 s 2 s 12 ms
>>> > 8 608 SUCCESS PROCESS_LOCAL serious.dataone.foo.bar.com
>>> > 2014/06/17 10:3

Re: Spark streaming and rate limit

2014-06-18 Thread Soumya Simanta
Flavio - i'm new to Spark as well but I've done stream processing using
other frameworks. My comments below are not spark-streaming specific. Maybe
someone who know more can provide better insights.

I read your post on my phone and I believe my answer doesn't completely
address the issue you have raised.

Do you need to call the external service for every event ? i.e., do you
need to process all events ? Also does order of processing events matter?
Is there is time bound in which each event should be processed ?

Calling an external service means network IO. So you have to buffer events
if your service is rate limited or slower than rate at which you are
processing your event.

Here are some ways of dealing with this situation:

1. Drop events based on a policy (such as buffer/queue size),
2. Tell the event producer to slow down if that's in your control
3. Use a proxy or a set of proxies to distribute the calls to the remote
service, if the rate limit is by user or network node only.

I'm not sure how many of these are implemented directly in Spark streaming
but you can have an external component that can :
control the rate of event and only send events to Spark streams when it's
ready to process more messages.

Hope this helps.

-Soumya




On Wed, Jun 18, 2014 at 6:50 PM, Flavio Pompermaier 
wrote:

> Thanks for the quick reply soumya. Unfortunately I'm a newbie with
> Spark..what do you mean? is there any reference to how to do that?
>
>
> On Thu, Jun 19, 2014 at 12:24 AM, Soumya Simanta  > wrote:
>
>>
>> You can add a back pressured enabled component in front that feeds data
>> into Spark. This component can control in input rate to spark.
>>
>> > On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier 
>> wrote:
>> >
>> > Hi to all,
>> > in my use case I'd like to receive events and call an external service
>> as they pass through. Is it possible to limit the number of contemporaneous
>> call to that service (to avoid DoS) using Spark streaming? if so, limiting
>> the rate implies a possible buffer growth...how can I control the buffer of
>> incoming events waiting to be processed?
>> >
>> > Best,
>> > Flavio
>>
>


Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nicholas Chammas
This is exciting! Here is the relevant "alpha" doc
 for
this feature, for others reading this. I'm going to try this out.

Will this be released with 1.1.0?


On Wed, Jun 18, 2014 at 8:31 PM, Zongheng Yang  wrote:

> If your input data is JSON, you can also try out the recently merged
> in initial JSON support:
>
> https://github.com/apache/spark/commit/d2f4f30b12f99358953e2781957468e2cfe3c916
>
> On Wed, Jun 18, 2014 at 5:27 PM, Nicholas Chammas
>  wrote:
> > That’s pretty neat! So I guess if you start with an RDD of objects, you’d
> > first do something like RDD.map(lambda x: Record(x['field_1'],
> x['field_2'],
> > ...)) in order to register it as a table, and from there run your
> > aggregates. Very nice.
> >
> >
> >
> > On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks 
> > wrote:
> >>
> >> This looks like a job for SparkSQL!
> >>
> >>
> >> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> >> import sqlContext._
> >> case class MyRecord(country: String, name: String, age: Int, hits: Long)
> >> val data = sc.parallelize(Array(MyRecord("USA", "Franklin", 24, 234),
> >> MyRecord("USA", "Bob", 55, 108), MyRecord("France", "Remi", 33, 72)))
> >> data.registerAsTable("MyRecords")
> >> val results = sql("""SELECT t.country, AVG(t.age), SUM(t.hits) FROM
> >> MyRecords t GROUP BY t.country""").collect
> >>
> >> Now "results" contains:
> >>
> >> Array[org.apache.spark.sql.Row] = Array([France,33.0,72],
> [USA,39.5,342])
> >>
> >>
> >>
> >> On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin 
> wrote:
> >>>
> >>> Hi Nick,
> >>>
> >>> Instead of using reduceByKey(), you might want to look into using
> >>> aggregateByKey(), which allows you to return a different value type U
> >>> instead of the input value type V for each input tuple (K, V). You can
> >>> define U to be a datatype that holds both the average and total and
> have
> >>> seqOp update both fields of U in a single pass.
> >>>
> >>> Hope this makes sense,
> >>> Doris
> >>>
> >>>
> >>> On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas
> >>>  wrote:
> 
>  The following is a simplified example of what I am trying to
> accomplish.
> 
>  Say I have an RDD of objects like this:
> 
>  {
>  "country": "USA",
>  "name": "Franklin",
>  "age": 24,
>  "hits": 224
>  }
>  {
> 
>  "country": "USA",
>  "name": "Bob",
>  "age": 55,
>  "hits": 108
>  }
>  {
> 
>  "country": "France",
>  "name": "Remi",
>  "age": 33,
>  "hits": 72
>  }
> 
>  I want to find the average age and total number of hits per country.
>  Ideally, I would like to scan the data once and perform both
> aggregations
>  simultaneously.
> 
>  What is a good approach to doing this?
> 
>  I’m thinking that we’d want to keyBy(country), and then somehow
>  reduceByKey(). The problem is, I don’t know how to approach writing a
>  function that can be passed to reduceByKey() and that will track a
> running
>  average and total simultaneously.
> 
>  Nick
> 
> 
>  
>  View this message in context: Patterns for making multiple
> aggregations
>  in one pass
>  Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>>
> >>
> >
>


Re: Trailing Tasks Saving to HDFS

2014-06-18 Thread Surendranauth Hiraman
Looks like eventually there was some type of reset or timeout and the tasks
have been reassigned. I'm guessing they'll keep failing until max failure
count.

The machine it disconnected from was a remote machine, though I've seen
such failures from connections to itself with other problems. The log lines
from the remote machine are also below.

Any thoughts or guesses would be appreciated!

*"HUNG" WORKER*

14/06/18 19:41:18 WARN network.ReceivingConnection: Error reading from
connection to ConnectionManagerId(172.16.25.103,57626)

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcher.read0(Native Method)

at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)

at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:251)

at sun.nio.ch.IOUtil.read(IOUtil.java:224)

at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:254)

at org.apache.spark.network.ReceivingConnection.read(Connection.scala:496)

at
org.apache.spark.network.ConnectionManager$$anon$6.run(ConnectionManager.scala:175)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)

at java.lang.Thread.run(Thread.java:679)

14/06/18 19:41:18 INFO network.ConnectionManager: Handling connection error
on connection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
SendingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.103,57626)

14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found


*REMOTE WORKER*

14/06/18 19:41:18 INFO network.ConnectionManager: Removing
ReceivingConnection to ConnectionManagerId(172.16.25.124,55610)

14/06/18 19:41:18 ERROR network.ConnectionManager: Corresponding
SendingConnectionManagerId not found



On Wed, Jun 18, 2014 at 7:16 PM, Surendranauth Hiraman <
suren.hira...@velos.io> wrote:

> I have a flow that ends with saveAsTextFile() to HDFS.
>
> It seems all the expected files per partition have been written out, based
> on the number of part files and the file sizes.
>
> But the driver logs show 2 tasks still not completed and has no activity
> and the worker logs show no activity for those two tasks for a while now.
>
> Has anyone run into this situation? It's happened to me a couple of times
> now.
>
> Thanks.
>
> -- Suren
>
> SUREN HIRAMAN, VP TECHNOLOGY
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR
> NEW YORK, NY 10001
> O: (917) 525-2466 ext. 105
> F: 646.349.4063
> E: suren.hiraman@v elos.io
> W: www.velos.io
>
>


-- 

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v elos.io
W: www.velos.io


Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Zongheng Yang
If your input data is JSON, you can also try out the recently merged
in initial JSON support:
https://github.com/apache/spark/commit/d2f4f30b12f99358953e2781957468e2cfe3c916

On Wed, Jun 18, 2014 at 5:27 PM, Nicholas Chammas
 wrote:
> That’s pretty neat! So I guess if you start with an RDD of objects, you’d
> first do something like RDD.map(lambda x: Record(x['field_1'], x['field_2'],
> ...)) in order to register it as a table, and from there run your
> aggregates. Very nice.
>
>
>
> On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks 
> wrote:
>>
>> This looks like a job for SparkSQL!
>>
>>
>> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>> import sqlContext._
>> case class MyRecord(country: String, name: String, age: Int, hits: Long)
>> val data = sc.parallelize(Array(MyRecord("USA", "Franklin", 24, 234),
>> MyRecord("USA", "Bob", 55, 108), MyRecord("France", "Remi", 33, 72)))
>> data.registerAsTable("MyRecords")
>> val results = sql("""SELECT t.country, AVG(t.age), SUM(t.hits) FROM
>> MyRecords t GROUP BY t.country""").collect
>>
>> Now "results" contains:
>>
>> Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])
>>
>>
>>
>> On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin  wrote:
>>>
>>> Hi Nick,
>>>
>>> Instead of using reduceByKey(), you might want to look into using
>>> aggregateByKey(), which allows you to return a different value type U
>>> instead of the input value type V for each input tuple (K, V). You can
>>> define U to be a datatype that holds both the average and total and have
>>> seqOp update both fields of U in a single pass.
>>>
>>> Hope this makes sense,
>>> Doris
>>>
>>>
>>> On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas
>>>  wrote:

 The following is a simplified example of what I am trying to accomplish.

 Say I have an RDD of objects like this:

 {
 "country": "USA",
 "name": "Franklin",
 "age": 24,
 "hits": 224
 }
 {

 "country": "USA",
 "name": "Bob",
 "age": 55,
 "hits": 108
 }
 {

 "country": "France",
 "name": "Remi",
 "age": 33,
 "hits": 72
 }

 I want to find the average age and total number of hits per country.
 Ideally, I would like to scan the data once and perform both aggregations
 simultaneously.

 What is a good approach to doing this?

 I’m thinking that we’d want to keyBy(country), and then somehow
 reduceByKey(). The problem is, I don’t know how to approach writing a
 function that can be passed to reduceByKey() and that will track a running
 average and total simultaneously.

 Nick


 
 View this message in context: Patterns for making multiple aggregations
 in one pass
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>>
>>
>


Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nicholas Chammas
That’s pretty neat! So I guess if you start with an RDD of objects, you’d
first do something like RDD.map(lambda x: Record(x['field_1'],
x['field_2'], ...)) in order to register it as a table, and from there run
your aggregates. Very nice.
​


On Wed, Jun 18, 2014 at 7:56 PM, Evan R. Sparks 
wrote:

> This looks like a job for SparkSQL!
>
>
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext._
> case class MyRecord(country: String, name: String, age: Int, hits: Long)
> val data = sc.parallelize(Array(MyRecord("USA", "Franklin", 24, 234),
> MyRecord("USA", "Bob", 55, 108), MyRecord("France", "Remi", 33, 72)))
> data.registerAsTable("MyRecords")
> val results = sql("""SELECT t.country, AVG(t.age), SUM(t.hits) FROM
> MyRecords t GROUP BY t.country""").collect
>
> Now "results" contains:
>
> Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])
>
>
> On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin  wrote:
>
>> Hi Nick,
>>
>> Instead of using reduceByKey(), you might want to look into using
>> aggregateByKey(), which allows you to return a different value type U
>> instead of the input value type V for each input tuple (K, V). You can
>> define U to be a datatype that holds both the average and total and have
>> seqOp update both fields of U in a single pass.
>>
>> Hope this makes sense,
>> Doris
>>
>>
>> On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas > > wrote:
>>
>>> The following is a simplified example of what I am trying to accomplish.
>>>
>>> Say I have an RDD of objects like this:
>>>
>>> {
>>> "country": "USA",
>>> "name": "Franklin",
>>> "age": 24,
>>> "hits": 224}
>>> {
>>>
>>> "country": "USA",
>>> "name": "Bob",
>>> "age": 55,
>>> "hits": 108}
>>> {
>>>
>>> "country": "France",
>>> "name": "Remi",
>>> "age": 33,
>>> "hits": 72}
>>>
>>> I want to find the average age and total number of hits per country.
>>> Ideally, I would like to scan the data once and perform both aggregations
>>> simultaneously.
>>>
>>> What is a good approach to doing this?
>>>
>>> I’m thinking that we’d want to keyBy(country), and then somehow
>>> reduceByKey(). The problem is, I don’t know how to approach writing a
>>> function that can be passed to reduceByKey() and that will track a
>>> running average and total simultaneously.
>>>
>>> Nick
>>> ​
>>>
>>> --
>>> View this message in context: Patterns for making multiple aggregations
>>> in one pass
>>> 
>>> Sent from the Apache Spark User List mailing list archive
>>>  at Nabble.com.
>>>
>>
>>
>


Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Matei Zaharia
I was going to suggest the same thing :).

On Jun 18, 2014, at 4:56 PM, Evan R. Sparks  wrote:

> This looks like a job for SparkSQL!
> 
> 
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext._
> case class MyRecord(country: String, name: String, age: Int, hits: Long)
> val data = sc.parallelize(Array(MyRecord("USA", "Franklin", 24, 234), 
> MyRecord("USA", "Bob", 55, 108), MyRecord("France", "Remi", 33, 72)))
> data.registerAsTable("MyRecords")
> val results = sql("""SELECT t.country, AVG(t.age), SUM(t.hits) FROM MyRecords 
> t GROUP BY t.country""").collect
> 
> Now "results" contains:
> Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])
> 
> 
> 
> On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin  wrote:
> Hi Nick,
> 
> Instead of using reduceByKey(), you might want to look into using 
> aggregateByKey(), which allows you to return a different value type U instead 
> of the input value type V for each input tuple (K, V). You can define U to be 
> a datatype that holds both the average and total and have seqOp update both 
> fields of U in a single pass.
> 
> Hope this makes sense,
> Doris
> 
> 
> On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas  
> wrote:
> The following is a simplified example of what I am trying to accomplish.
> 
> Say I have an RDD of objects like this:
> 
> {
> "country": "USA",
> "name": "Franklin",
> "age": 24,
> "hits": 224
> }
> {
> 
> "country": "USA",
> "name": "Bob",
> "age": 55,
> "hits": 108
> }
> {
> 
> "country": "France",
> "name": "Remi",
> "age": 33,
> "hits": 72
> }
> I want to find the average age and total number of hits per country. Ideally, 
> I would like to scan the data once and perform both aggregations 
> simultaneously.
> 
> What is a good approach to doing this?
> 
> I’m thinking that we’d want to keyBy(country), and then somehow 
> reduceByKey(). The problem is, I don’t know how to approach writing a 
> function that can be passed to reduceByKey() and that will track a running 
> average and total simultaneously.
> 
> Nick
> 
> ​
> 
> View this message in context: Patterns for making multiple aggregations in 
> one pass
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> 



Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Evan R. Sparks
This looks like a job for SparkSQL!


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
case class MyRecord(country: String, name: String, age: Int, hits: Long)
val data = sc.parallelize(Array(MyRecord("USA", "Franklin", 24, 234),
MyRecord("USA", "Bob", 55, 108), MyRecord("France", "Remi", 33, 72)))
data.registerAsTable("MyRecords")
val results = sql("""SELECT t.country, AVG(t.age), SUM(t.hits) FROM
MyRecords t GROUP BY t.country""").collect

Now "results" contains:

Array[org.apache.spark.sql.Row] = Array([France,33.0,72], [USA,39.5,342])


On Wed, Jun 18, 2014 at 4:42 PM, Doris Xin  wrote:

> Hi Nick,
>
> Instead of using reduceByKey(), you might want to look into using
> aggregateByKey(), which allows you to return a different value type U
> instead of the input value type V for each input tuple (K, V). You can
> define U to be a datatype that holds both the average and total and have
> seqOp update both fields of U in a single pass.
>
> Hope this makes sense,
> Doris
>
>
> On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas 
> wrote:
>
>> The following is a simplified example of what I am trying to accomplish.
>>
>> Say I have an RDD of objects like this:
>>
>> {
>> "country": "USA",
>> "name": "Franklin",
>> "age": 24,
>> "hits": 224}
>> {
>>
>> "country": "USA",
>> "name": "Bob",
>> "age": 55,
>> "hits": 108}
>> {
>>
>> "country": "France",
>> "name": "Remi",
>> "age": 33,
>> "hits": 72}
>>
>> I want to find the average age and total number of hits per country.
>> Ideally, I would like to scan the data once and perform both aggregations
>> simultaneously.
>>
>> What is a good approach to doing this?
>>
>> I’m thinking that we’d want to keyBy(country), and then somehow
>> reduceByKey(). The problem is, I don’t know how to approach writing a
>> function that can be passed to reduceByKey() and that will track a
>> running average and total simultaneously.
>>
>> Nick
>> ​
>>
>> --
>> View this message in context: Patterns for making multiple aggregations
>> in one pass
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nicholas Chammas
Ah, this looks like exactly what I need! It looks like this was recently added
into PySpark  (and
Spark Core), but it's not in the 1.0.0 release.

Thank you.

Nick


On Wed, Jun 18, 2014 at 7:42 PM, Doris Xin  wrote:

> Hi Nick,
>
> Instead of using reduceByKey(), you might want to look into using
> aggregateByKey(), which allows you to return a different value type U
> instead of the input value type V for each input tuple (K, V). You can
> define U to be a datatype that holds both the average and total and have
> seqOp update both fields of U in a single pass.
>
> Hope this makes sense,
> Doris
>
>
> On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas 
> wrote:
>
>> The following is a simplified example of what I am trying to accomplish.
>>
>> Say I have an RDD of objects like this:
>>
>> {
>> "country": "USA",
>> "name": "Franklin",
>> "age": 24,
>> "hits": 224}
>> {
>>
>> "country": "USA",
>> "name": "Bob",
>> "age": 55,
>> "hits": 108}
>> {
>>
>> "country": "France",
>> "name": "Remi",
>> "age": 33,
>> "hits": 72}
>>
>> I want to find the average age and total number of hits per country.
>> Ideally, I would like to scan the data once and perform both aggregations
>> simultaneously.
>>
>> What is a good approach to doing this?
>>
>> I’m thinking that we’d want to keyBy(country), and then somehow
>> reduceByKey(). The problem is, I don’t know how to approach writing a
>> function that can be passed to reduceByKey() and that will track a
>> running average and total simultaneously.
>>
>> Nick
>> ​
>>
>> --
>> View this message in context: Patterns for making multiple aggregations
>> in one pass
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Re: Patterns for making multiple aggregations in one pass

2014-06-18 Thread Doris Xin
Hi Nick,

Instead of using reduceByKey(), you might want to look into using
aggregateByKey(), which allows you to return a different value type U
instead of the input value type V for each input tuple (K, V). You can
define U to be a datatype that holds both the average and total and have
seqOp update both fields of U in a single pass.

Hope this makes sense,
Doris


On Wed, Jun 18, 2014 at 4:28 PM, Nick Chammas 
wrote:

> The following is a simplified example of what I am trying to accomplish.
>
> Say I have an RDD of objects like this:
>
> {
> "country": "USA",
> "name": "Franklin",
> "age": 24,
> "hits": 224}
> {
>
> "country": "USA",
> "name": "Bob",
> "age": 55,
> "hits": 108}
> {
>
> "country": "France",
> "name": "Remi",
> "age": 33,
> "hits": 72}
>
> I want to find the average age and total number of hits per country.
> Ideally, I would like to scan the data once and perform both aggregations
> simultaneously.
>
> What is a good approach to doing this?
>
> I’m thinking that we’d want to keyBy(country), and then somehow
> reduceByKey(). The problem is, I don’t know how to approach writing a
> function that can be passed to reduceByKey() and that will track a
> running average and total simultaneously.
>
> Nick
> ​
>
> --
> View this message in context: Patterns for making multiple aggregations
> in one pass
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Patterns for making multiple aggregations in one pass

2014-06-18 Thread Nick Chammas
The following is a simplified example of what I am trying to accomplish.

Say I have an RDD of objects like this:

{
"country": "USA",
"name": "Franklin",
"age": 24,
"hits": 224}
{

"country": "USA",
"name": "Bob",
"age": 55,
"hits": 108}
{

"country": "France",
"name": "Remi",
"age": 33,
"hits": 72}

I want to find the average age and total number of hits per country.
Ideally, I would like to scan the data once and perform both aggregations
simultaneously.

What is a good approach to doing this?

I’m thinking that we’d want to keyBy(country), and then somehow
reduceByKey(). The problem is, I don’t know how to approach writing a
function that can be passed to reduceByKey() and that will track a running
average and total simultaneously.

Nick
​




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Patterns-for-making-multiple-aggregations-in-one-pass-tp7874.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Trailing Tasks Saving to HDFS

2014-06-18 Thread Surendranauth Hiraman
I have a flow that ends with saveAsTextFile() to HDFS.

It seems all the expected files per partition have been written out, based
on the number of part files and the file sizes.

But the driver logs show 2 tasks still not completed and has no activity
and the worker logs show no activity for those two tasks for a while now.

Has anyone run into this situation? It's happened to me a couple of times
now.

Thanks.

-- Suren

SUREN HIRAMAN, VP TECHNOLOGY
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR
NEW YORK, NY 10001
O: (917) 525-2466 ext. 105
F: 646.349.4063
E: suren.hiraman@v elos.io
W: www.velos.io


create SparkContext dynamically

2014-06-18 Thread jamborta
Hi all,

I am setting up a system where spark contexts would be created by a web
server that would handle the computation and return the results. I have the
following code (in python)

os.environ['SPARK_HOME'] = "/home/spark/spark-1.0.0-bin-hadoop2/"
sc = SparkContext(master="spark://ip-xx-xx-xx-xx:7077", appName="Simple
App")
l =sc.parallelize([1,2,3,4])
c = l.count() 

but it throws an unrelated error 'TypeError: an integer is required' in the
last line.

I assume I did not setup the environment properly. I have added spark_home
and py4j source to the classpath. not sure what is missing.

thanks,





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/create-SparkContext-dynamically-tp7872.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark streaming and rate limit

2014-06-18 Thread Flavio Pompermaier
Thanks for the quick reply soumya. Unfortunately I'm a newbie with
Spark..what do you mean? is there any reference to how to do that?

On Thu, Jun 19, 2014 at 12:24 AM, Soumya Simanta 
wrote:

>
> You can add a back pressured enabled component in front that feeds data
> into Spark. This component can control in input rate to spark.
>
> > On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier 
> wrote:
> >
> > Hi to all,
> > in my use case I'd like to receive events and call an external service
> as they pass through. Is it possible to limit the number of contemporaneous
> call to that service (to avoid DoS) using Spark streaming? if so, limiting
> the rate implies a possible buffer growth...how can I control the buffer of
> incoming events waiting to be processed?
> >
> > Best,
> > Flavio
>


Re: Spark streaming and rate limit

2014-06-18 Thread Soumya Simanta

You can add a back pressured enabled component in front that feeds data into 
Spark. This component can control in input rate to spark. 

> On Jun 18, 2014, at 6:13 PM, Flavio Pompermaier  wrote:
> 
> Hi to all,
> in my use case I'd like to receive events and call an external service as 
> they pass through. Is it possible to limit the number of contemporaneous call 
> to that service (to avoid DoS) using Spark streaming? if so, limiting the 
> rate implies a possible buffer growth...how can I control the buffer of 
> incoming events waiting to be processed?
> 
> Best,
> Flavio 


Re: Issue while trying to aggregate with a sliding window

2014-06-18 Thread Hatch M
Ok that patch does fix the key lookup exception. However, curious about the
time validity check..isValidTime (
https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L264
)

Why does (time - zerotime) have to be a multiple of slide duration ?
Shouldn't the reduceByKeyAndWindow aggregate every record in a given window
(zeroTime to zeroTime+windowDuration)?


On Tue, Jun 17, 2014 at 10:55 PM, Hatch M  wrote:

> Thanks! Will try to get the fix and retest.
>
>
> On Tue, Jun 17, 2014 at 5:30 PM, onpoq l  wrote:
>
>> There is a bug:
>>
>> https://github.com/apache/spark/pull/961#issuecomment-45125185
>>
>>
>> On Tue, Jun 17, 2014 at 8:19 PM, Hatch M  wrote:
>> > Trying to aggregate over a sliding window, playing with the slide
>> duration.
>> > Playing around with the slide interval I can see the aggregation works
>> but
>> > mostly fails with the below error. The stream has records coming in at
>> > 100ms.
>> >
>> > JavaPairDStream aggregatedDStream =
>> > pairsDStream.reduceByKeyAndWindow(aggFunc, new Duration(6), new
>> > Duration(60));
>> >
>> > 14/06/18 00:14:46 INFO dstream.ShuffledDStream: Time 1403050486900 ms is
>> > invalid as zeroTime is 1403050485800 ms and slideDuration is 6 ms
>> and
>> > difference is 1100 ms
>> > 14/06/18 00:14:46 ERROR actor.OneForOneStrategy: key not found:
>> > 1403050486900 ms
>> > java.util.NoSuchElementException: key not found: 1403050486900 ms
>> > at scala.collection.MapLike$class.default(MapLike.scala:228)
>> >
>> > Any hints on whats going on here?
>> > Thanks!
>> > Hatch
>> >
>>
>
>


Spark streaming and rate limit

2014-06-18 Thread Flavio Pompermaier
Hi to all,
in my use case I'd like to receive events and call an external service as
they pass through. Is it possible to limit the number of contemporaneous
call to that service (to avoid DoS) using Spark streaming? if so, limiting
the rate implies a possible buffer growth...how can I control the buffer of
incoming events waiting to be processed?

Best,
Flavio


java.lang.OutOfMemoryError with saveAsTextFile

2014-06-18 Thread Muttineni, Vinay
Hi,
I have a 5 million record, 300 column data set.
I am running a spark job in yarn-cluster mode, with the following args
--driver-memory 11G --executor-memory 11G --executor-cores 16  --num-executors 
500
The spark job replaces all categorical variables with some integers.
I am getting the below error when I try to save the transformed data set.

java.lang.OutOfMemoryError (java.lang.OutOfMemoryError: GC overhead limit 
exceeded)
java.util.Arrays.copyOfRange(Arrays.java:3209)
java.lang.String.(String.java:215)
java.lang.StringBuilder.toString(StringBuilder.java:430)
java.io.ObjectInputStream$BlockDataInputStream.readUTFBody(ObjectInputStream.java:3023)
java.io.ObjectInputStream$BlockDataInputStream.readUTF(ObjectInputStream.java:2819)
java.io.ObjectInputStream.readString(ObjectInputStream.java:1598)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1319)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:176)
scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
scala.collection.AbstractIterator.to(Iterator.scala:1157)
scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:257)
scala.collection.AbstractIterator.toList(Iterator.scala:1157)
scala.collection.immutable.List.$plus$plus(List.scala:193)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)
DataProcessor.DataTypeConverter2$$anonfun$6.apply(DataTypeConverter2.scala:137)

The code is as follows:
  val transformedData = splitFileWithHeader.flatMap(rowArray => {
  try {
if (rowArray.sameElements(header.value)) {
  None
} else {
  val transformedArray: Array[String] = new 
Array[String](rowArray.length)
  for (i <- 0 until rowArray.length) {
//  Check 1 to see if the value should be replaced, Check 2 
to see if its a null value in which case, we do not update the value
if (broadcastReplacements.value(i) != null && 
rowArray(i).trim.toString != "") {
  transformedArray.update(i, 
broadcastReplacements.value(i)(rowArray(i).trim.toString).toString)
} else {
  transformedArray.update(i, rowArray(i).trim.toString)
}
  }
  Array(transformedArray.deep.mkString(","))
}
  }
  catch
{

  case _: Throwable => {
println("Failure in transforming the file, 1 line, Around Line 131")
None
  }

}

}).coalesce(1, true).mapPartitions( it => (Seq(headerLine.value) ++ 
it).iterator,true).coalesce(500)

//Save the Transformed Data File
transformedData.saveAsTextFile(outputFileLocation)


Any idea how I can resolve this error?
Previous stages have completed successfully.
Thank You!
Vinay



Prior Stages

   val dataFile = sc.textFile(args(1),500)
//Get the first line which is the header, which would also contain the 
column type
val columnDefinition = dataFile.first
val headerLine = sc.broadcast(columnDefinition)
val header = sc.broadcast(columnDefinition.split(",",-1))
//  Remove the Header
val modifiedDataFile = dataFile.filter(line => line != headerLine.value)
val onlySplitFile = modifiedDataFile.flatMap(line =>
  {
try {
  //println(line.split(' ').length)
  //println(line.split(' '))
  if (line.split(',').length < 1 || 
line.split(',').sameElements(Array(""))) {
None
  } else {
Array(line.split(",",-1))
  }

} catch {
  case _: Throwable => None
}
  })
modifiedDataFile.unpersist(true)

val currentColumn = sc.broadcast(i)
val distinctValues = onlySplitFile.flatMap(rowArray =>
  {
try {
 

Re: Spark is now available via Homebrew

2014-06-18 Thread Nicholas Chammas
>From the user perspective, I don't think it's a big deal either way. It
looks like contributors to Homebrew are pretty on top of keeping things
updated.

For the users, Apache managing this would mostly mean a) a shorter period
of time from release to brew availability, and b) the brew installation
method is now "official".

Offering b) to users does indeed require a commitment from the dev team, so
that definitely should be weighed carefully.

Nick


On Wed, Jun 18, 2014 at 5:44 PM, Andrew Ash  wrote:

> What's the advantage of Apache maintaining the brew installer vs users?
>
> Apache handling it means more work on this dev team, but probably a better
> experience for brew users.  Just wanted to weigh pros/cons before
> committing to support this installation method.
>
> Andrew
>
>
> On Wed, Jun 18, 2014 at 5:29 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Matei,
>>
>> You might want to comment on that issue Sherl linked to, or perhaps this
>> one , to ask about
>> how Apache can manage this going forward. I know that mikemcquaid
>>  is very active on the Homebrew repo and
>> is one of the maintainers.
>>
>> Nick
>>
>>
>> On Wed, Jun 18, 2014 at 5:17 PM, Sheryl John  wrote:
>>
>>> Cool.
>>> Looked at the Pull Requests, the upgrade to 1.0.0 was just merged
>>> yesterday. https://github.com/Homebrew/homebrew/pull/30231
>>>
>>>
>>> https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb
>>>
>>>
>>> On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia 
>>> wrote:
>>>
 Interesting, does anyone know the people over there who set it up? It
 would be good if Apache itself could publish packages there, though I’m not
 sure what’s involved. Since Spark just depends on Java and Python it should
 be easy for us to update.

 Matei

 On Jun 18, 2014, at 1:37 PM, Nick Chammas 
 wrote:

 OS X / Homebrew users,

 It looks like you can now download Spark simply by doing:

 brew install apache-spark

 I’m new to Homebrew, so I’m not too sure how people are intended to use
 this. I’m guessing this would just be a convenient way to get the latest
 release onto your workstation, and from there use spark-ec2 to launch
 clusters.

 Anyway, just a cool thing to point out.

 Nick

 --
 View this message in context: Spark is now available via Homebrew
 
 Sent from the Apache Spark User List mailing list archive
  at Nabble.com.



>>>
>>>
>>> --
>>> -Sheryl
>>>
>>
>>
>


Re: Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-18 Thread Andrew Ash
Wait, so the file only has four lines and the job running out of heap
space?  Can you share the code you're running that does the processing?
 I'd guess that you're doing some intense processing on every line but just
writing parsed case classes back to disk sounds very lightweight.

I


On Wed, Jun 18, 2014 at 5:17 PM, Shivani Rao  wrote:

> I am trying to process a file that contains 4 log lines (not very long)
> and then write my parsed out case classes to a destination folder, and I
> get the following error:
>
>
> java.lang.OutOfMemoryError: Java heap space
>
> at
> org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
>
> at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)
>
> at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)
>
> at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)
>
> at
> org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>
> at java.lang.reflect.Method.invoke(Method.java:597)
>
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
>
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>
> at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
>
> at
> org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>
> at java.lang.reflect.Method.invoke(Method.java:597)
>
> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)
>
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)
>
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)
>
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)
>
>
> Sadly, there are several folks that have faced this error while trying to
> execute Spark jobs and there are various solutions, none of which work for
> me
>
>
> a) I tried (
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-td7735.html#a7736)
> changing the number of partitions in my RDD by using coalesce(8) and the
> error persisted
>
> b)  I tried changing SPARK_WORKER_MEM=2g, SPARK_EXECUTOR_MEMORY=10g, and
> both did not work
>
> c) I strongly suspect there is a class path error (
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-td4719.html)
> Mainly because the call stack is repetitive. Maybe the OOM error is a
> disguise ?
>
> d) I checked that i am not out of disk space and that i do not have too
> many open files (ulimit -u << sudo ls /proc//fd |
> wc -l)
>
>
> I am also noticing multiple reflections happening to find the right
> "class" i guess, so it could be "class Not Found: error disguising itself
> as a memory error.
>
>
> Here are other threads that are encountering same situation .. but have
> not been resolved in any way so far..
>
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/no-response-in-spark-web-UI-td4633.html
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-td4268.html
>
>
> Any help is greatly appreciated. I am especially calling out on creators
> of Spark and Databrick folks. This seems like a "known bug" waiting to
> happen.
>
>
> Thanks,
>
> Shivani
>
> --
> Software Engineer
> Analytics Engineering Team@ Box
> Mountain View, CA
>


Re: Spark is now available via Homebrew

2014-06-18 Thread Andrew Ash
What's the advantage of Apache maintaining the brew installer vs users?

Apache handling it means more work on this dev team, but probably a better
experience for brew users.  Just wanted to weigh pros/cons before
committing to support this installation method.

Andrew


On Wed, Jun 18, 2014 at 5:29 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Matei,
>
> You might want to comment on that issue Sherl linked to, or perhaps this
> one , to ask about how
> Apache can manage this going forward. I know that mikemcquaid
>  is very active on the Homebrew repo and
> is one of the maintainers.
>
> Nick
>
>
> On Wed, Jun 18, 2014 at 5:17 PM, Sheryl John  wrote:
>
>> Cool.
>> Looked at the Pull Requests, the upgrade to 1.0.0 was just merged
>> yesterday. https://github.com/Homebrew/homebrew/pull/30231
>>
>>
>> https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb
>>
>>
>> On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia 
>> wrote:
>>
>>> Interesting, does anyone know the people over there who set it up? It
>>> would be good if Apache itself could publish packages there, though I’m not
>>> sure what’s involved. Since Spark just depends on Java and Python it should
>>> be easy for us to update.
>>>
>>> Matei
>>>
>>> On Jun 18, 2014, at 1:37 PM, Nick Chammas 
>>> wrote:
>>>
>>> OS X / Homebrew users,
>>>
>>> It looks like you can now download Spark simply by doing:
>>>
>>> brew install apache-spark
>>>
>>> I’m new to Homebrew, so I’m not too sure how people are intended to use
>>> this. I’m guessing this would just be a convenient way to get the latest
>>> release onto your workstation, and from there use spark-ec2 to launch
>>> clusters.
>>>
>>> Anyway, just a cool thing to point out.
>>>
>>> Nick
>>>
>>> --
>>> View this message in context: Spark is now available via Homebrew
>>> 
>>> Sent from the Apache Spark User List mailing list archive
>>>  at Nabble.com.
>>>
>>>
>>>
>>
>>
>> --
>> -Sheryl
>>
>
>


Re: Spark is now available via Homebrew

2014-06-18 Thread Nicholas Chammas
Matei,

You might want to comment on that issue Sherl linked to, or perhaps this one
, to ask about how
Apache can manage this going forward. I know that mikemcquaid
 is very active on the Homebrew repo and is
one of the maintainers.

Nick


On Wed, Jun 18, 2014 at 5:17 PM, Sheryl John  wrote:

> Cool.
> Looked at the Pull Requests, the upgrade to 1.0.0 was just merged
> yesterday. https://github.com/Homebrew/homebrew/pull/30231
>
>
> https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb
>
>
> On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia 
> wrote:
>
>> Interesting, does anyone know the people over there who set it up? It
>> would be good if Apache itself could publish packages there, though I’m not
>> sure what’s involved. Since Spark just depends on Java and Python it should
>> be easy for us to update.
>>
>> Matei
>>
>> On Jun 18, 2014, at 1:37 PM, Nick Chammas 
>> wrote:
>>
>> OS X / Homebrew users,
>>
>> It looks like you can now download Spark simply by doing:
>>
>> brew install apache-spark
>>
>> I’m new to Homebrew, so I’m not too sure how people are intended to use
>> this. I’m guessing this would just be a convenient way to get the latest
>> release onto your workstation, and from there use spark-ec2 to launch
>> clusters.
>>
>> Anyway, just a cool thing to point out.
>>
>> Nick
>>
>> --
>> View this message in context: Spark is now available via Homebrew
>> 
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>>
>>
>
>
> --
> -Sheryl
>


Re: Spark is now available via Homebrew

2014-06-18 Thread Nicholas Chammas
Agreed, it would be better if Apache controlled or managed this directly.

I think making such a change is just a matter of opening a new issue
 on the Homebrew issue
tracker. I believe that's how Spark made it in there in the first place--it
was just a user contribution.

Nick


On Wed, Jun 18, 2014 at 4:57 PM, Matei Zaharia 
wrote:

> Interesting, does anyone know the people over there who set it up? It
> would be good if Apache itself could publish packages there, though I’m not
> sure what’s involved. Since Spark just depends on Java and Python it should
> be easy for us to update.
>
> Matei
>
> On Jun 18, 2014, at 1:37 PM, Nick Chammas 
> wrote:
>
> OS X / Homebrew users,
>
> It looks like you can now download Spark simply by doing:
>
> brew install apache-spark
>
> I’m new to Homebrew, so I’m not too sure how people are intended to use
> this. I’m guessing this would just be a convenient way to get the latest
> release onto your workstation, and from there use spark-ec2 to launch
> clusters.
>
> Anyway, just a cool thing to point out.
>
> Nick
> ​
>
> --
> View this message in context: Spark is now available via Homebrew
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>
>
>


Spark 0.9.1 java.lang.outOfMemoryError: Java Heap Space

2014-06-18 Thread Shivani Rao
I am trying to process a file that contains 4 log lines (not very long) and
then write my parsed out case classes to a destination folder, and I get
the following error:


java.lang.OutOfMemoryError: Java heap space

at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)

at org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2244)

at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:280)

at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:75)

at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)

at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)

at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)

at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)

at java.lang.reflect.Method.invoke(Method.java:597)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:974)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1848)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1328)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1946)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1870)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1752)


Sadly, there are several folks that have faced this error while trying to
execute Spark jobs and there are various solutions, none of which work for
me


a) I tried (
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-td7735.html#a7736)
changing the number of partitions in my RDD by using coalesce(8) and the
error persisted

b)  I tried changing SPARK_WORKER_MEM=2g, SPARK_EXECUTOR_MEMORY=10g, and
both did not work

c) I strongly suspect there is a class path error (
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-td4719.html)
Mainly because the call stack is repetitive. Maybe the OOM error is a
disguise ?

d) I checked that i am not out of disk space and that i do not have too
many open files (ulimit -u << sudo ls /proc//fd |
wc -l)


I am also noticing multiple reflections happening to find the right "class"
i guess, so it could be "class Not Found: error disguising itself as a
memory error.


Here are other threads that are encountering same situation .. but have not
been resolved in any way so far..


http://apache-spark-user-list.1001560.n3.nabble.com/no-response-in-spark-web-UI-td4633.html

http://apache-spark-user-list.1001560.n3.nabble.com/Spark-program-thows-OutOfMemoryError-td4268.html


Any help is greatly appreciated. I am especially calling out on creators of
Spark and Databrick folks. This seems like a "known bug" waiting to happen.


Thanks,

Shivani

-- 
Software Engineer
Analytics Engineering Team@ Box
Mountain View, CA


Re: Spark is now available via Homebrew

2014-06-18 Thread Sheryl John
Cool.
Looked at the Pull Requests, the upgrade to 1.0.0 was just merged
yesterday. https://github.com/Homebrew/homebrew/pull/30231

https://github.com/Homebrew/homebrew/blob/master/Library/Formula/apache-spark.rb


On Wed, Jun 18, 2014 at 1:57 PM, Matei Zaharia 
wrote:

> Interesting, does anyone know the people over there who set it up? It
> would be good if Apache itself could publish packages there, though I'm not
> sure what's involved. Since Spark just depends on Java and Python it should
> be easy for us to update.
>
> Matei
>
> On Jun 18, 2014, at 1:37 PM, Nick Chammas 
> wrote:
>
> OS X / Homebrew users,
>
> It looks like you can now download Spark simply by doing:
>
> brew install apache-spark
>
> I'm new to Homebrew, so I'm not too sure how people are intended to use
> this. I'm guessing this would just be a convenient way to get the latest
> release onto your workstation, and from there use spark-ec2 to launch
> clusters.
>
> Anyway, just a cool thing to point out.
>
> Nick
>
> --
> View this message in context: Spark is now available via Homebrew
> 
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>
>
>


-- 
-Sheryl


Re: No Intercept for Python

2014-06-18 Thread Naftali Harris
Thanks Reza! :-D

Naftali


On Wed, Jun 18, 2014 at 1:47 PM, Reza Zadeh  wrote:

> Hi Naftali,
>
> Yes you're right. For now please add a column of ones. We are working on
> adding a weighted regularization term, and exposing the scala intercept
> option in the python binding.
>
> Best,
> Reza
>
>
> On Mon, Jun 16, 2014 at 12:19 PM, Naftali Harris 
> wrote:
>
>> Hi everyone,
>>
>> The Python LogisticRegressionWithSGD does not appear to estimate an
>> intercept.  When I run the following, the returned weights and intercept
>> are both 0.0:
>>
>> from pyspark import SparkContext
>> from pyspark.mllib.regression import LabeledPoint
>> from pyspark.mllib.classification import LogisticRegressionWithSGD
>>
>> def main():
>> sc = SparkContext(appName="NoIntercept")
>>
>> train = sc.parallelize([LabeledPoint(0, [0]), LabeledPoint(1, [0]),
>> LabeledPoint(1, [0])])
>>
>> model = LogisticRegressionWithSGD.train(train, iterations=500,
>> step=0.1)
>> print "Final weights: " + str(model.weights)
>> print "Final intercept: " + str(model.intercept)
>>
>> if __name__ == "__main__":
>> main()
>>
>>
>> Of course, one can fit an intercept with the simple expedient of adding a
>> column of ones, but that's kind of annoying.  Moreover, it looks like the
>> scala version has an intercept option.
>>
>> Am I missing something? Should I just add the column of ones? If I
>> submitted a PR doing that, is that the sort of thing you guys would accept?
>>
>> Thanks! :-)
>>
>> Naftali
>>
>
>


Re: Spark is now available via Homebrew

2014-06-18 Thread Matei Zaharia
Interesting, does anyone know the people over there who set it up? It would be 
good if Apache itself could publish packages there, though I’m not sure what’s 
involved. Since Spark just depends on Java and Python it should be easy for us 
to update.

Matei

On Jun 18, 2014, at 1:37 PM, Nick Chammas  wrote:

> OS X / Homebrew users,
> 
> It looks like you can now download Spark simply by doing:
> 
> brew install apache-spark
> I’m new to Homebrew, so I’m not too sure how people are intended to use this. 
> I’m guessing this would just be a convenient way to get the latest release 
> onto your workstation, and from there use spark-ec2 to launch clusters.
> 
> Anyway, just a cool thing to point out.
> 
> Nick
> 
> ​
> 
> View this message in context: Spark is now available via Homebrew
> Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: No Intercept for Python

2014-06-18 Thread Reza Zadeh
Hi Naftali,

Yes you're right. For now please add a column of ones. We are working on
adding a weighted regularization term, and exposing the scala intercept
option in the python binding.

Best,
Reza


On Mon, Jun 16, 2014 at 12:19 PM, Naftali Harris  wrote:

> Hi everyone,
>
> The Python LogisticRegressionWithSGD does not appear to estimate an
> intercept.  When I run the following, the returned weights and intercept
> are both 0.0:
>
> from pyspark import SparkContext
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.classification import LogisticRegressionWithSGD
>
> def main():
> sc = SparkContext(appName="NoIntercept")
>
> train = sc.parallelize([LabeledPoint(0, [0]), LabeledPoint(1, [0]),
> LabeledPoint(1, [0])])
>
> model = LogisticRegressionWithSGD.train(train, iterations=500,
> step=0.1)
> print "Final weights: " + str(model.weights)
> print "Final intercept: " + str(model.intercept)
>
> if __name__ == "__main__":
> main()
>
>
> Of course, one can fit an intercept with the simple expedient of adding a
> column of ones, but that's kind of annoying.  Moreover, it looks like the
> scala version has an intercept option.
>
> Am I missing something? Should I just add the column of ones? If I
> submitted a PR doing that, is that the sort of thing you guys would accept?
>
> Thanks! :-)
>
> Naftali
>


Spark is now available via Homebrew

2014-06-18 Thread Nick Chammas
OS X / Homebrew users,

It looks like you can now download Spark simply by doing:

brew install apache-spark

I’m new to Homebrew, so I’m not too sure how people are intended to use
this. I’m guessing this would just be a convenient way to get the latest
release onto your workstation, and from there use spark-ec2 to launch
clusters.

Anyway, just a cool thing to point out.

Nick
​




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-is-now-available-via-Homebrew-tp7856.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Unit test failure: Address already in use

2014-06-18 Thread Philip Ogren
In my unit tests I have a base class that all my tests extend that has a 
setup and teardown method that they inherit.  They look something like this:


var spark: SparkContext = _

@Before
def setUp() {
Thread.sleep(100L) //this seems to give spark more time to 
reset from the previous test's tearDown

spark = new SparkContext("local", "test spark")
}

@After
def tearDown() {
spark.stop
spark = null //not sure why this helps but it does!
System.clearProperty("spark.master.port")
   }


It's been since last fall (i.e. version 0.8.x) since I've examined this 
code and so I can't vouch that it is still accurate/necessary - but it 
still works for me.



On 06/18/2014 12:59 PM, Lisonbee, Todd wrote:


Disabling parallelExecution has worked for me.

Other alternatives I’ve tried that also work include:

1. Using a lock – this will let tests execute in parallel except for 
those using a SparkContext.  If you have a large number of tests that 
could execute in parallel, this can shave off some time.


object TestingSparkContext {

val lock = new Lock()

}

// before you instantiate your local SparkContext

TestingSparkContext.lock.acquire()

// after you call sc.stop()

TestingSparkContext.lock.release()

2. Sharing a local SparkContext between tests.

- This is nice because your tests will run faster.  Start-up and 
shutdown is time consuming (can add a few seconds per test).


- The downside is that your tests are using the same SparkContext so 
they are less independent of each other.  I haven’t seen issues with 
this yet but there are likely some things that might crop up.


Best,

Todd

*From:*Anselme Vignon [mailto:anselme.vig...@flaminem.com]
*Sent:* Wednesday, June 18, 2014 12:33 AM
*To:* user@spark.apache.org
*Subject:* Re: Unit test failure: Address already in use

Hi,

Could your problem come from the fact that you run your tests in 
parallel ?


If you are spark in local mode, you cannot have concurrent spark 
instances running. this means that your tests instantiating 
sparkContext cannot be run in parallel. The easiest fix is to tell sbt 
to not run parallel tests.


This can be done by adding the following line in your build.sbt:

parallelExecution in Test := false

Cheers,

Anselme

2014-06-17 23:01 GMT+02:00 SK >:


Hi,

I have 3 unit tests (independent of each other) in the /src/test/scala
folder. When I run each of them individually using: sbt "test-only
",
all the 3 pass the test. But when I run them all using "sbt test",
then they
fail with the warning below. I am wondering if the binding
exception results
in failure to run the job, thereby causing the failure. If so,
what can I do
to address this binding exception? I am running these tests
locally on a
standalone machine (i.e. SparkContext("local", "test")).


14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@3487b78d
:
java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:174)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
at
sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)


thanks



--
View this message in context:

http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
Sent from the Apache Spark User List mailing list archive at
Nabble.com.





RE: Unit test failure: Address already in use

2014-06-18 Thread Lisonbee, Todd

Disabling parallelExecution has worked for me.

Other alternatives I’ve tried that also work include:

1. Using a lock – this will let tests execute in parallel except for those 
using a SparkContext.  If you have a large number of tests that could execute 
in parallel, this can shave off some time.

object TestingSparkContext {
  val lock = new Lock()
}

// before you instantiate your local SparkContext
TestingSparkContext.lock.acquire()

// after you call sc.stop()
TestingSparkContext.lock.release()


2. Sharing a local SparkContext between tests.

- This is nice because your tests will run faster.  Start-up and shutdown is 
time consuming (can add a few seconds per test).

- The downside is that your tests are using the same SparkContext so they are 
less independent of each other.  I haven’t seen issues with this yet but there 
are likely some things that might crop up.

Best,

Todd


From: Anselme Vignon [mailto:anselme.vig...@flaminem.com]
Sent: Wednesday, June 18, 2014 12:33 AM
To: user@spark.apache.org
Subject: Re: Unit test failure: Address already in use

Hi,

Could your problem come from the fact that you run your tests in parallel ?

If you are spark in local mode, you cannot have concurrent spark instances 
running. this means that your tests instantiating sparkContext cannot be run in 
parallel. The easiest fix is to tell sbt to not run parallel tests.
This can be done by adding the following line in your build.sbt:

parallelExecution in Test := false

Cheers,

Anselme



2014-06-17 23:01 GMT+02:00 SK 
mailto:skrishna...@gmail.com>>:
Hi,

I have 3 unit tests (independent of each other) in the /src/test/scala
folder. When I run each of them individually using: sbt "test-only ",
all the 3 pass the test. But when I run them all using "sbt test", then they
fail with the warning below. I am wondering if the binding exception results
in failure to run the job, thereby causing the failure. If so, what can I do
to address this binding exception? I am running these tests locally on a
standalone machine (i.e. SparkContext("local", "test")).


14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
org.eclipse.jetty.server.Server@3487b78d:
 java.net.BindException: Address
already in use
java.net.BindException: Address already in use
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Net.java:174)
at
sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)


thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: question about setting SPARK_CLASSPATH IN spark_env.sh

2014-06-18 Thread santhoma
by the way, any idea how to sync the spark config dir with other nodes in the
cluster?

~santhosh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/question-about-setting-SPARK-CLASSPATH-IN-spark-env-sh-tp7809p7853.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode

2014-06-18 Thread Andrew Lee
Forgot to mention that I am using spark-submit to submit jobs, and a verbose 
mode print out looks like this with the SparkPi examples.The .sparkStaging 
won't be deleted. My thoughts is that this should be part of the staging and 
should be cleaned up as well when sc gets terminated.









[test@ spark]$ SPARK_YARN_USER_ENV="spark.yarn.preserve.staging.files=false" 
SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop2.2.0.jar 
./bin/spark-submit --verbose --master yarn --deploy-mode cluster --class 
org.apache.spark.examples.SparkPi --driver-memory 512M --driver-library-path 
/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar --executor-memory 512M 
--executor-cores 1 --queue research --num-executors 2 
examples/target/spark-examples_2.10-1.0.0.jar 
















Using properties file: null
Using properties file: null
Parsed arguments:
  master  yarn
  deployMode  cluster
  executorMemory  512M
  executorCores   1
  totalExecutorCores  null
  propertiesFile  null
  driverMemory512M
  driverCores null
  driverExtraClassPathnull
  driverExtraLibraryPath  /opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar
  driverExtraJavaOptions  null
  supervise   false
  queue   research
  numExecutors2
  files   null
  pyFiles null
  archivesnull
  mainClass   org.apache.spark.examples.SparkPi
  primaryResource 
file:/opt/spark/examples/target/spark-examples_2.10-1.0.0.jar
  nameorg.apache.spark.examples.SparkPi
  childArgs   []
  jarsnull
  verbose true


Default properties from null:
  



Using properties file: null
Main class:
org.apache.spark.deploy.yarn.Client
Arguments:
--jar
file:/opt/spark/examples/target/spark-examples_2.10-1.0.0.jar
--class
org.apache.spark.examples.SparkPi
--name
org.apache.spark.examples.SparkPi
--driver-memory
512M
--queue
research
--num-executors
2
--executor-memory
512M
--executor-cores
1
System properties:
spark.driver.extraLibraryPath -> 
/opt/hadoop/share/hadoop/mapreduce/lib/hadoop-lzo.jar
SPARK_SUBMIT -> true
spark.app.name -> org.apache.spark.examples.SparkPi
Classpath elements:








From: alee...@hotmail.com
To: user@spark.apache.org
Subject: HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode
Date: Wed, 18 Jun 2014 11:05:12 -0700




Hi All,
Have anyone ran into the same problem? By looking at the source code in 
official release (rc11),this property settings is set to false by default, 
however, I'm seeing the .sparkStaging folder remains on the HDFS and causing it 
to fill up the disk pretty fast since SparkContext deploys the fat JAR file 
(~115MB) every time for each job and it is not cleaned up.








yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:
  val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", 
"false").toBoolean
[test@spark ~]$ hdfs dfs -ls .sparkStagingFound 46 itemsdrwx--   - test 
users  0 2014-05-01 01:42 
.sparkStaging/application_1398370455828_0050drwx--   - test users  
0 2014-05-01 02:03 .sparkStaging/application_1398370455828_0051drwx--   - 
test users  0 2014-05-01 02:04 
.sparkStaging/application_1398370455828_0052drwx--   - test users  
0 2014-05-01 05:44 .sparkStaging/application_1398370455828_0053drwx--   - 
test users  0 2014-05-01 05:45 
.sparkStaging/application_1398370455828_0055drwx--   - test users  
0 2014-05-01 05:46 .sparkStaging/application_1398370455828_0056drwx--   - 
test users  0 2014-05-01 05:49 
.sparkStaging/application_1398370455828_0057drwx--   - test users  
0 2014-05-01 05:52 .sparkStaging/application_1398370455828_0058drwx--   - 
test users  0 2014-05-01 05:58 
.sparkStaging/application_1398370455828_0059drwx--   - test users  
0 2014-05-01 07:38 .sparkStaging/application_1398370455828_0060drwx--   - 
test users  0 2014-05-01 07:41 
.sparkStaging/application_1398370455828_0061….drwx--   - test users 
 0 2014-06-16 14:45 .sparkStaging/application_1402001910637_0131drwx--   - 
test users  0 2014-06-16 15:03 
.sparkStaging/application_1402001910637_0135drwx--   - test users  
0 2014-06-16 15:16 .sparkStaging/application_1402001910637_0136drwx--   - 
test users  0 2014-06-16 15:46 
.sparkStaging/application_1402001910637_0138drwx--   - test users  
0 2014-06-16 23:57 .sparkStaging/application_1402001910637_0157drwx--   - 
test users  0 2014-06-17 05:55 
.sparkStaging/application_1402001910637_0161
Is this something that needs to be explicitly set in 
:SPARK_YARN_USER_ENV="spark.yarn.preserve.staging.files=false"
http://spark.apache.org/docs/latest/running-on-yarn.htmlspark.

HDFS folder .sparkStaging not deleted and filled up HDFS in yarn mode

2014-06-18 Thread Andrew Lee
Hi All,
Have anyone ran into the same problem? By looking at the source code in 
official release (rc11),this property settings is set to false by default, 
however, I'm seeing the .sparkStaging folder remains on the HDFS and causing it 
to fill up the disk pretty fast since SparkContext deploys the fat JAR file 
(~115MB) every time for each job and it is not cleaned up.








yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala:
  val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", 
"false").toBoolean
[test@spark ~]$ hdfs dfs -ls .sparkStagingFound 46 itemsdrwx--   - test 
users  0 2014-05-01 01:42 
.sparkStaging/application_1398370455828_0050drwx--   - test users  
0 2014-05-01 02:03 .sparkStaging/application_1398370455828_0051drwx--   - 
test users  0 2014-05-01 02:04 
.sparkStaging/application_1398370455828_0052drwx--   - test users  
0 2014-05-01 05:44 .sparkStaging/application_1398370455828_0053drwx--   - 
test users  0 2014-05-01 05:45 
.sparkStaging/application_1398370455828_0055drwx--   - test users  
0 2014-05-01 05:46 .sparkStaging/application_1398370455828_0056drwx--   - 
test users  0 2014-05-01 05:49 
.sparkStaging/application_1398370455828_0057drwx--   - test users  
0 2014-05-01 05:52 .sparkStaging/application_1398370455828_0058drwx--   - 
test users  0 2014-05-01 05:58 
.sparkStaging/application_1398370455828_0059drwx--   - test users  
0 2014-05-01 07:38 .sparkStaging/application_1398370455828_0060drwx--   - 
test users  0 2014-05-01 07:41 
.sparkStaging/application_1398370455828_0061….drwx--   - test users 
 0 2014-06-16 14:45 .sparkStaging/application_1402001910637_0131drwx--   - 
test users  0 2014-06-16 15:03 
.sparkStaging/application_1402001910637_0135drwx--   - test users  
0 2014-06-16 15:16 .sparkStaging/application_1402001910637_0136drwx--   - 
test users  0 2014-06-16 15:46 
.sparkStaging/application_1402001910637_0138drwx--   - test users  
0 2014-06-16 23:57 .sparkStaging/application_1402001910637_0157drwx--   - 
test users  0 2014-06-17 05:55 
.sparkStaging/application_1402001910637_0161
Is this something that needs to be explicitly set in 
:SPARK_YARN_USER_ENV="spark.yarn.preserve.staging.files=false"
http://spark.apache.org/docs/latest/running-on-yarn.htmlspark.yarn.preserve.staging.filesfalseSet
 to true to preserve the staged files (Spark jar, app jar, distributed cache 
files) at the end of the job rather then delete them.or this is a bug that is 
not honoring the default value and is override to true somewhere?
Thanks.


  

Re: Wildcard support in input path

2014-06-18 Thread Nicholas Chammas
I wonder if that’s the problem. Is there an equivalent hadoop fs -ls
command you can run that returns the same files you want but doesn’t have
that month= string?
​


On Wed, Jun 18, 2014 at 12:25 PM, Jianshi Huang 
wrote:

> Hi Nicholas,
>
> month= is for Hive to auto discover the partitions. It's part of the url
> of my files.
>
> Jianshi
>
>
> On Wed, Jun 18, 2014 at 11:52 PM, Nicholas Chammas <
> nicholas.cham...@gmail.com> wrote:
>
>> Is that month= syntax something special, or do your files actually have
>> that string as part of their name?
>> ​
>>
>>
>> On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang 
>> wrote:
>>
>>> Hi all,
>>>
>>> Thanks for the reply. I'm using parquetFile as input, is that a problem?
>>> In hadoop fs -ls, the path (hdfs://domain/user/
>>> jianshuang/data/parquet/table/month=2014*) will get list all the files.
>>>
>>> I'll test it again.
>>>
>>> Jianshi
>>>
>>>
>>> On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang 
>>> wrote:
>>>
 Hi Andrew,

 Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it
 says file not found. I'll try again.

 Jianshi


 On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash 
 wrote:

> In Spark you can use the normal globs supported by Hadoop's
> FileSystem, which are documented here:
> http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)
>
>
> On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW <
> meethu2...@yahoo.co.in> wrote:
>
>> Hi Jianshi,
>>
>> I have used wild card characters (*) in my program and it worked..
>> My code was like this
>> b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")
>>
>> Thanks & Regards,
>> Meethu M
>>
>>
>>   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang <
>> jianshi.hu...@gmail.com> wrote:
>>
>>
>>  It would be convenient if Spark's textFile, parquetFile, etc. can
>> support path with wildcard, such as:
>>
>>   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*
>>
>>  Or is there already a way to do it now?
>>
>> Jianshi
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>


 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github & Blog: http://huangjs.github.com/

>>>
>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>


Re: Wildcard support in input path

2014-06-18 Thread Jianshi Huang
Hi Nicholas,

month= is for Hive to auto discover the partitions. It's part of the url of
my files.

Jianshi


On Wed, Jun 18, 2014 at 11:52 PM, Nicholas Chammas <
nicholas.cham...@gmail.com> wrote:

> Is that month= syntax something special, or do your files actually have
> that string as part of their name?
> ​
>
>
> On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang 
> wrote:
>
>> Hi all,
>>
>> Thanks for the reply. I'm using parquetFile as input, is that a problem?
>> In hadoop fs -ls, the path (hdfs://domain/user/
>> jianshuang/data/parquet/table/month=2014*) will get list all the files.
>>
>> I'll test it again.
>>
>> Jianshi
>>
>>
>> On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang 
>> wrote:
>>
>>> Hi Andrew,
>>>
>>> Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
>>> file not found. I'll try again.
>>>
>>> Jianshi
>>>
>>>
>>> On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash 
>>> wrote:
>>>
 In Spark you can use the normal globs supported by Hadoop's FileSystem,
 which are documented here:
 http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)


 On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW >>> > wrote:

> Hi Jianshi,
>
> I have used wild card characters (*) in my program and it worked..
> My code was like this
> b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")
>
> Thanks & Regards,
> Meethu M
>
>
>   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang <
> jianshi.hu...@gmail.com> wrote:
>
>
>  It would be convenient if Spark's textFile, parquetFile, etc. can
> support path with wildcard, such as:
>
>   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*
>
>  Or is there already a way to do it now?
>
> Jianshi
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>

>>>
>>>
>>> --
>>> Jianshi Huang
>>>
>>> LinkedIn: jianshi
>>> Twitter: @jshuang
>>> Github & Blog: http://huangjs.github.com/
>>>
>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: Wildcard support in input path

2014-06-18 Thread Nicholas Chammas
Is that month= syntax something special, or do your files actually have
that string as part of their name?
​


On Wed, Jun 18, 2014 at 2:25 AM, Jianshi Huang 
wrote:

> Hi all,
>
> Thanks for the reply. I'm using parquetFile as input, is that a problem?
> In hadoop fs -ls, the path (hdfs://domain/user/
> jianshuang/data/parquet/table/month=2014*) will get list all the files.
>
> I'll test it again.
>
> Jianshi
>
>
> On Wed, Jun 18, 2014 at 2:23 PM, Jianshi Huang 
> wrote:
>
>> Hi Andrew,
>>
>> Strangely in my spark (1.0.0 compiled against hadoop 2.4.0) log, it says
>> file not found. I'll try again.
>>
>> Jianshi
>>
>>
>> On Wed, Jun 18, 2014 at 12:36 PM, Andrew Ash 
>> wrote:
>>
>>> In Spark you can use the normal globs supported by Hadoop's FileSystem,
>>> which are documented here:
>>> http://hadoop.apache.org/docs/r2.3.0/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)
>>>
>>>
>>> On Wed, Jun 18, 2014 at 12:09 AM, MEETHU MATHEW 
>>> wrote:
>>>
 Hi Jianshi,

 I have used wild card characters (*) in my program and it worked..
 My code was like this
 b = sc.textFile("hdfs:///path to file/data_file_2013SEP01*")

 Thanks & Regards,
 Meethu M


   On Wednesday, 18 June 2014 9:29 AM, Jianshi Huang <
 jianshi.hu...@gmail.com> wrote:


  It would be convenient if Spark's textFile, parquetFile, etc. can
 support path with wildcard, such as:

   hdfs://domain/user/jianshuang/data/parquet/table/month=2014*

  Or is there already a way to do it now?

 Jianshi

 --
 Jianshi Huang

 LinkedIn: jianshi
 Twitter: @jshuang
 Github & Blog: http://huangjs.github.com/



>>>
>>
>>
>> --
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>


Re: rdd.cache() is not faster?

2014-06-18 Thread Gaurav Jain
" if I do have big data (40GB, cached size is 60GB) and even big memory (192
GB), I cannot benefit from RDD cache, and should persist on disk and
leverage filesystem cache?"

The answer to the question of whether to persist (spill-over) data on disk
is not always immediately clear, because generally the functions to compute
RDD partitions are not as expensive as retrieving the saved partition from
disk. That's why, the default STORAGE_LEVEL never stores RDD partitions on
disk, and instead computes them on the fly. Also, you can try using Kryo
serialization (if not using it already) to reduce memory usage. Playing
around with different Storage levels (MEMORY_ONLY_SER, for example) might
also help. 

Best
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
Email: jaing at student dot ethz dot ch



-
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7846.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: rdd.cache() is not faster?

2014-06-18 Thread Wei Tan
Hi Gaurav, thanks for your pointer. The observation in the link is (at 
least qualitatively) similar to mine.

Now the question is, if I do have big data (40GB, cached size is 60GB) and 
even big memory (192 GB), I cannot benefit from RDD cache, and should 
persist on disk and leverage filesystem cache?

I will try more workers so that each JVM has a smaller heap.

Best regards,
Wei

-
Wei Tan, PhD
Research Staff Member
IBM T. J. Watson Research Center
http://researcher.ibm.com/person/us-wtan



From:   Gaurav Jain 
To: u...@spark.incubator.apache.org, 
Date:   06/18/2014 06:30 AM
Subject:Re: rdd.cache() is not faster?



You cannot assume that caching would always reduce the execution time,
especially if the data-set is large. It appears that if too much memory is
used for caching, then less memory is left for the actual computation
itself. There has to be a balance between the two. 

Page 33 of this thesis from KTH talks about this:
http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf

Best



-
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html

Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: Spark 1.0.0 java.lang.outOfMemoryError: Java Heap Space

2014-06-18 Thread Sguj
I got rid of most of my heap errors by increasing the number of partitions of
my RDDs by 8-16x. I found in the  tuning page
   that heap space errors
can be caused by a hash table that's generated during the shuffle functions,
so by splitting up how much is in each shuffle function with partitions, I
was able to get rid of the errors. Thanks for putting me on the right path
with the partitions.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-0-java-lang-outOfMemoryError-Java-Heap-Space-tp7735p7843.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Contribution to Spark MLLib

2014-06-18 Thread Denis Turdakov
Hello everybody,

Xiangrui, thanks for the link to roadmap. I saw it is planned to implement
LDA in the MLlib 1.1. What do you think about PLSA? 

I understand that LDA is more popular now, but recent research shows that
modifications of PLSA sometimes performs better[1]. Furthermore, the most
recent paper by same authors shows that there is a clear way to extend PLSA
to LDA and beyond[2]. We can implement PLSA with this modifications in
MLlib. Is it interesting?

Actually we already have implementation of Robust PLSA over Spark. So the
task is to integrate it into MLlib.

1. A. Potapenko, K. Vorontsov. 2013. Robust PLSA performs better than LDA.
In Proceedings of ECIR'13.
2. Vorontsov, Potapenko. Tutorial on Probabilistic Topic Modeling: Additive
Regularization for Stochastic Matrix Factorization.
http://www.machinelearning.ru/wiki/images/1/1f/Voron14aist.pdf 

Best regards,
Denis.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Contribution-to-Spark-MLLib-tp7716p7844.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Java IO Stream Corrupted - Invalid Type AC?

2014-06-18 Thread Surendranauth Hiraman
Patrick,

My team is using shuffle consolidation but not speculation. We are also
using persist(DISK_ONLY) for caching.

Here are some config changes that are in our work-in-progress.

We've been trying for 2 weeks to get our production flow (maybe around
50-70 stages, a few forks and joins with up to 20 branches in the forks) to
run end to end without any success, running into other problems besides
this one as well. For example, we have run into situations where saving to
HDFS just hangs on a couple of tasks, which are printing out nothing in
their logs and not taking any CPU. For testing, our input data is 10 GB
across 320 input splits and generates maybe around 200-300 GB of
intermediate and final data.


conf.set("spark.executor.memory", "14g") // TODO make this
configurable

// shuffle configs
conf.set("spark.default.parallelism", "320") // TODO make this
configurable
conf.set("spark.shuffle.consolidateFiles","true")

conf.set("spark.shuffle.file.buffer.kb", "200")
conf.set("spark.reducer.maxMbInFlight", "96")

conf.set("spark.rdd.compress","true"

// we ran into a problem with the default timeout of 60 seconds
// this is also being set in the master's spark-env.sh. Not sure if
it needs to be in both places
conf.set("spark.worker.timeout","180")

// akka settings
conf.set("spark.akka.threads", "300")
conf.set("spark.akka.timeout", "180")
conf.set("spark.akka.frameSize", "100")
conf.set("spark.akka.batchSize", "30")
conf.set("spark.akka.askTimeout", "30")

// block manager
conf.set("spark.storage.blockManagerTimeoutIntervalMs", "18")
conf.set("spark.blockManagerHeartBeatMs", "8")

-Suren



On Wed, Jun 18, 2014 at 1:42 AM, Patrick Wendell  wrote:

> Out of curiosity - are you guys using speculation, shuffle
> consolidation, or any other non-default option? If so that would help
> narrow down what's causing this corruption.
>
> On Tue, Jun 17, 2014 at 10:40 AM, Surendranauth Hiraman
>  wrote:
> > Matt/Ryan,
> >
> > Did you make any headway on this? My team is running into this also.
> > Doesn't happen on smaller datasets. Our input set is about 10 GB but we
> > generate 100s of GBs in the flow itself.
> >
> > -Suren
> >
> >
> >
> >
> > On Fri, Jun 6, 2014 at 5:19 PM, Ryan Compton 
> wrote:
> >
> >> Just ran into this today myself. I'm on branch-1.0 using a CDH3
> >> cluster (no modifications to Spark or its dependencies). The error
> >> appeared trying to run GraphX's .connectedComponents() on a ~200GB
> >> edge list (GraphX worked beautifully on smaller data).
> >>
> >> Here's the stacktrace (it's quite similar to yours
> >> https://imgur.com/7iBA4nJ ).
> >>
> >> 14/06/05 20:02:28 ERROR scheduler.TaskSetManager: Task 5.599:39 failed
> >> 4 times; aborting job
> >> 14/06/05 20:02:28 INFO scheduler.DAGScheduler: Failed to run reduce at
> >> VertexRDD.scala:100
> >> Exception in thread "main" org.apache.spark.SparkException: Job
> >> aborted due to stage failure: Task 5.599:39 failed 4 times, most
> >> recent failure: Exception failure in TID 29735 on host node18:
> >> java.io.StreamCorruptedException: invalid type code: AC
> >>
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1355)
> >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:350)
> >>
> >>
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63)
> >>
> >>
> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:125)
> >>
> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >>
> >>
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:30)
> >>
> >>
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> >> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.graphx.impl.VertexPartitionBaseOps.innerJoinKeepLeft(VertexPartitionBaseOps.scala:192)
> >>
> >>
> org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:78)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
> >>
> >>
> org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
> >> scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> >> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>
> >>
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala

BSP realization on Spark

2014-06-18 Thread Ghousia
Hi,

We are trying to implement a BSP model in Spark with the help of GraphX.
One thing I encountered is a Pregel operator in Graph class. But what I
fail to understand is how the Master and Worker needs to be assigned (BSP),
and how barrier synchronization would happen. The pregel operator provides
a way to define a vertex program, but nothing is mentioned about the
barrier synchronization.

Any help in this regard is truly appreciated.

Many Thanks,
Ghousia.


Cannot print a derived DStream after reduceByKey

2014-06-18 Thread haopu
In the test application, I create a DStream by connect with a socket. 
Then I want to count the RDDs in the DStream which matches with another
reference RDD. 
Below is the Java code for my application. 

== 
public class TestSparkStreaming { 

public static void main(String[] args) { 
// Function to make a pair of String 
class StringToPair implements PairFunction { 
String value_; 
StringToPair(String value) { 
value_ = value; 
} 
@Override 
public Tuple2 call(String arg0)
throws Exception { 
return new Tuple2(arg0,
value_); 
} 
} 

JavaStreamingContext jssc = new
JavaStreamingContext("local", "TestSparkStreaming", new Duration(1000)); 
JavaReceiverInputDStream networkevents =
jssc.socketTextStream("localhost", ); 

// Pair input line with "world" 
JavaPairDStream streamEvents =
networkevents.mapToPair(new StringToPair("world")); 

// Construct "hello" -> "spark" pair for input line to join with 
JavaSparkContext sc = new JavaSparkContext(new SparkConf()); 
List list = Arrays.asList("hello"); 
JavaRDD reference = sc.parallelize(list); 
final JavaPairRDD referenceData =
reference.mapToPair(new StringToPair("spark")); 

class MatchInputLine implements
PairFunction, String, Long> { 
@Override 
public Tuple2 call( 
Tuple2 t) throws
Exception { 
final String inputKey = t._1; 
final String inputValue = t._2; 
final List ret =
referenceData.lookup(inputKey); 
return new Tuple2(inputKey,
new Long((ret != null) ? ret.size() : 0)); 
} 
} 

// Construct an output DStream if matched 
JavaPairDStream joinedStream =
streamEvents.mapToPair(new MatchInputLine()); 

// Count the output 
class Count implements Function2 { 
@Override 
public Long call(Long v1, Long v2) throws Exception
{ 
return v1 + v2; 
} 
} 
JavaPairDStream aggregatedJoinedStream =
joinedStream.reduceByKey(new Count()); 

// Print the output 
aggregatedJoinedStream.count().print(); 

jssc.start();   
jssc.awaitTermination(); 
} 
} 
== 

I'm testing on Windows in local mode (1.0.0). After I start the socket
server (the "nc" program mentioned in Spark's document) and submit the
packaged jar into Spark, I expect to see the output when I type "hello" in. 
However, I didn't see any output. I saw below message in the console where I
submit the jar. 

== 
14/06/18 18:17:48 INFO JobScheduler: Added jobs for time 1403086668000 ms 
14/06/18 18:17:48 INFO MemoryStore: ensureFreeSpace(12) called with
curMem=0, maxMem=1235327385 
14/06/18 18:17:48 INFO MemoryStore: Block input-0-1403086668400 stored as
bytesto memory (size 12.0 B, free 1178.1 MB) 
14/06/18 18:17:48 INFO BlockManagerInfo: Added input-0-1403086668400 in
memory on PEK-WKST68449:60769 (size: 12.0 B, free: 1178.1 MB) 
14/06/18 18:17:48 INFO BlockManagerMaster: Updated info of block
input-0-1403086668400 
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[PEK-WKST68449/10.101.3.75:60769] 
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[PEK-WKST68449/10.101.3.75] 
14/06/18 18:17:48 INFO SendingConnection: Connected to
[PEK-WKST68449/10.101.3.75:60769], 1 messages pending 
14/06/18 18:17:48 WARN BlockManager: Block input-0-1403086668400 already
existson this machine; not re-adding it 
14/06/18 18:17:48 INFO SendingConnection: Initiating connection to
[/127.0.0.1:60789] 
14/06/18 18:17:48 INFO ConnectionManager: Accepted connection from
[127.0.0.1/127.0.0.1] 
14/06/18 18:17:48 INFO SendingConnection: Connected to [/127.0.0.1:60789], 1
messages pending
14/06/18 18:17:48 INFO BlockGenerator: Pushed block input-0-1403086668400 
14/06/18 18:17:49 INFO ReceiverTracker: Stream 0 received 1 blocks 
14/06/18 18:17:49 INFO JobScheduler: Added jobs for time 1403086669000 ms 
== 

I see one "Waiting Batches" in Spark's monitoring UI. I'm not sure if that's
related with the problem. 

Can you suggest about the problem? I guess this is a basic question about
reduce function.
I will appreciate any help, thank you! 

Re: Cannot print a derived DStream after reduceByKey

2014-06-18 Thread haopu
I guess this is a basic question about the usage of reduce. Please shed some
lights, thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Cannot-print-a-derived-DStream-after-reduceByKey-tp7834p7836.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: rdd.cache() is not faster?

2014-06-18 Thread Gaurav Jain
You cannot assume that caching would always reduce the execution time,
especially if the data-set is large. It appears that if too much memory is
used for caching, then less memory is left for the actual computation
itself. There has to be a balance between the two. 

Page 33 of this thesis from KTH talks about this:
http://www.diva-portal.org/smash/get/diva2:605106/FULLTEXT01.pdf

Best



-
Gaurav Jain
Master's Student, D-INFK
ETH Zurich
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/rdd-cache-is-not-faster-tp7804p7835.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Enormous EC2 price jump makes "r3.large" patch more important

2014-06-18 Thread Jeremy Lee
Hmm.. kinda working...

I'm getting a broken apache/ganglion at the last step, although spark-shell
does run.

Starting GANGLIA gmetad:   [  OK  ]
Stopping httpd:[FAILED]
Starting httpd: httpd: Syntax error on line 153 of
/etc/httpd/conf/httpd.conf: Cannot load modules/mod_authn_alias.so into
server: /etc/httpd/modules/mod_authn_alias.so: cannot open shared object
file: No such file or directory
   [FAILED]

I've seen this before, though apparently it's not as fatal as I thought it
was. But I will miss ganglia. I think it's exclusive to the hvm ami. I
don't know.

And I definitely have to create /mnt/spark and copy-dir it before the
examples (bin/run-example SparkPi) will run, or this:

14/06/18 10:00:20 ERROR scheduler.TaskSetManager: Task 0.0:1 failed 1
times; aborting job
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0.0:1 failed 1 times, most recent failure: Exception
failure in TID 1 on host localhost: java.io.IOException: No such file or
directory
java.io.UnixFileSystem.createFileExclusively(Native Method)
java.io.File.createNewFile(File.java:1006)
java.io.File.createTempFile(File.java:1989)
org.apache.spark.util.Utils$.fetchFile(Utils.scala:326)

I still feel like I'm missing something obvious, and making it harder than
it should be. (I wanted to try out Cassandra, but getting that on the
cluster as well looks like too much pain for now.) I'd be willing to step
up to YARN/Ambari, but is it worth it? They don't say much about Spark, and
the doco is pretty thin. I could just be trading up to more complicated
problems.



On Wed, Jun 18, 2014 at 4:05 PM, Jeremy Lee 
wrote:

> Ah, right. So only the launch script has changed. Everything else is still
> essentially binary compatible?
>
> Well, that makes it too easy! Thanks!
>
>
> On Wed, Jun 18, 2014 at 2:35 PM, Patrick Wendell 
> wrote:
>
>> Actually you'll just want to clone the 1.0 branch then use the
>> spark-ec2 script in there to launch your cluster. The --spark-git-repo
>> flag is if you want to launch with a different version of Spark on the
>> cluster. In your case you just need a different version of the launch
>> script itself, which will be present in the 1.0 branch of Spark.
>>
>> - Patrick
>>
>> On Tue, Jun 17, 2014 at 9:29 PM, Jeremy Lee
>>  wrote:
>> > I am about to spin up some new clusters, so I may give that a go... any
>> > special instructions for making them work? I assume I use the "
>> > --spark-git-repo=" option on the spark-ec2 command. Is it as easy as
>> > concatenating your string as the value?
>> >
>> > On cluster management GUIs... I've been looking around at Amabari,
>> Datastax,
>> > Cloudera, OpsCenter etc. Not totally convinced by any of them yet.
>> Anyone
>> > using a good one I should know about? I'm really beginning to lean in
>> the
>> > direction of Cassandra as the distributed data store...
>> >
>> >
>> > On Wed, Jun 18, 2014 at 1:46 PM, Patrick Wendell 
>> wrote:
>> >>
>> >> By the way, in case it's not clear, I mean our maintenance branches:
>> >>
>> >> https://github.com/apache/spark/tree/branch-1.0
>> >>
>> >> On Tue, Jun 17, 2014 at 8:35 PM, Patrick Wendell 
>> >> wrote:
>> >> > Hey Jeremy,
>> >> >
>> >> > This is patched in the 1.0 and 0.9 branches of Spark. We're likely to
>> >> > make a 1.0.1 release soon (this patch being one of the main reasons),
>> >> > but if you are itching for this sooner, you can just checkout the
>> head
>> >> > of branch-1.0 and you will be able to use r3.XXX instances.
>> >> >
>> >> > - Patrick
>> >> >
>> >> > On Tue, Jun 17, 2014 at 4:17 PM, Jeremy Lee
>> >> >  wrote:
>> >> >> Some people (me included) might have wondered why all our m1.large
>> spot
>> >> >> instances (in us-west-1) shut down a few hours ago...
>> >> >>
>> >> >> Simple reason: The EC2 spot price for Spark's default "m1.large"
>> >> >> instances
>> >> >> just jumped from 0.016 per hour, to about 0.750. Yes, Fifty times.
>> >> >> Probably
>> >> >> something to do with world cup.
>> >> >>
>> >> >> So far this is just us-west-1, but prices have a tendency to
>> equalize
>> >> >> across
>> >> >> centers as the days pass. Time to make backups and plans.
>> >> >>
>> >> >> "m3" spot prices are still down at $0.02 (and being new, will be
>> >> >> bypassed by
>> >> >> older systems), so it would be REAAALLYY nice if there had been some
>> >> >> progress on that issue. Let me know if I can help with testing and
>> >> >> whatnot.
>> >> >>
>> >> >>
>> >> >> --
>> >> >> Jeremy Lee  BCompSci(Hons)
>> >> >>   The Unorthodox Engineers
>> >
>> >
>> >
>> >
>> > --
>> > Jeremy Lee  BCompSci(Hons)
>> >   The Unorthodox Engineers
>>
>
>
>
> --
> Jeremy Lee  BCompSci(Hons)
>   The Unorthodox Engineers
>



-- 
Jeremy Lee  BCompSci(Hons)
  The Unorthodox Engineers


Shark Tasks in parallel execution

2014-06-18 Thread majian
HI,all

I confuse that why shark window don't execute queries also occupied resources ? 
In the case of  shark window is not closed how can parallel execution multiple 
queries? 

for example 
Spark config:
 total  Nodes: 3 
 Spark total  cores :9
Shark config:
SPARK_JAVA_OPTS+="-Dspark.scheduler.allocation.file=/opt/spark-0.9.1-bin-hadoop1/conf/fairscheduler.xml
 "
SPARK_JAVA_OPTS+="-Dspark.cores.max=9 "
SPARK_JAVA_OPTS+="-Dspark.scheduler.mode=FAIR "

I can only open a window to execute queries, if set spark.cores.max less than 9 
I think there are not take full advantage of the cluster.


Thank you for your help !!!

2014-06-18 



majian 


Re: get schema from SchemaRDD

2014-06-18 Thread Michael Armbrust
We just merged a feature into master that lets you print the schema or view
it as a string (printSchema() and schemaTreeString on SchemaRDD).

There is also this JIRA targeting 1.1 for presenting a nice programatic API
for this information: https://issues.apache.org/jira/browse/SPARK-2179


On Wed, Jun 18, 2014 at 10:36 AM, Kevin Jung  wrote:

> Can I get schema information from SchemaRDD?
> For example,
>
> *case class Person(name:String, Age:Int, Gender:String, Birth:String)
> val peopleRDD = sc.textFile("/sample/sample.csv").map(_.split(",")).map(p
> =>
> Person(p(0).toString, p(1).toInt, p(2).toString, p(3).toString))
> peopleRDD.saveAsParquetFile("people.parquet")*
>
> (few days later...)
>
> *val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext._
> val loadedPeopleRDD = sqlContext.parquetFile("people.parquet")
> loadedPeopleRDD.registerAsTable("peopleTable")*
>
> Someone who doesn't know Person class can't know what columns and types
> this
> table have.
> Maybe they want to get schema information from loadedPeopleRDD.
> How can I do this?
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/get-schema-from-SchemaRDD-tp7830.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


get schema from SchemaRDD

2014-06-18 Thread Kevin Jung
Can I get schema information from SchemaRDD?
For example,

*case class Person(name:String, Age:Int, Gender:String, Birth:String)
val peopleRDD = sc.textFile("/sample/sample.csv").map(_.split(",")).map(p =>
Person(p(0).toString, p(1).toInt, p(2).toString, p(3).toString))
peopleRDD.saveAsParquetFile("people.parquet")*

(few days later...)

*val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val loadedPeopleRDD = sqlContext.parquetFile("people.parquet")
loadedPeopleRDD.registerAsTable("peopleTable")*

Someone who doesn't know Person class can't know what columns and types this
table have.
Maybe they want to get schema information from loadedPeopleRDD.
How can I do this?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/get-schema-from-SchemaRDD-tp7830.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Unit test failure: Address already in use

2014-06-18 Thread Anselme Vignon
Hi,

Could your problem come from the fact that you run your tests in parallel ?

If you are spark in local mode, you cannot have concurrent spark instances
running. this means that your tests instantiating sparkContext cannot be
run in parallel. The easiest fix is to tell sbt to not run parallel tests.
This can be done by adding the following line in your build.sbt:

parallelExecution in Test := false

Cheers,

Anselme




2014-06-17 23:01 GMT+02:00 SK :

> Hi,
>
> I have 3 unit tests (independent of each other) in the /src/test/scala
> folder. When I run each of them individually using: sbt "test-only ",
> all the 3 pass the test. But when I run them all using "sbt test", then
> they
> fail with the warning below. I am wondering if the binding exception
> results
> in failure to run the job, thereby causing the failure. If so, what can I
> do
> to address this binding exception? I am running these tests locally on a
> standalone machine (i.e. SparkContext("local", "test")).
>
>
> 14/06/17 13:42:48 WARN component.AbstractLifeCycle: FAILED
> org.eclipse.jetty.server.Server@3487b78d: java.net.BindException: Address
> already in use
> java.net.BindException: Address already in use
> at sun.nio.ch.Net.bind0(Native Method)
> at sun.nio.ch.Net.bind(Net.java:174)
> at
> sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:139)
> at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:77)
>
>
> thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Unit-test-failure-Address-already-in-use-tp7771.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: join operation is taking too much time

2014-06-18 Thread MEETHU MATHEW
Hi,
Thanks Andrew and Daniel for the response.

Setting spark.shuffle.spill to false didnt make any difference. 5 days  
completed in 6 min and 10 days was stuck after around 1hr.


Daniel,in my current use case I cant read all the files to a single RDD.But I 
have another use case where I did it in that way,ie  I read all the files to a 
single RDD and joined with with the RDD of 9 million rows and it worked fine  
and took only 3 minutes.
 
Thanks & Regards, 
Meethu M


On Wednesday, 18 June 2014 12:11 AM, Daniel Darabos 
 wrote:
 


I've been wondering about this. Is there a difference in performance between 
these two?

valrdd1 =sc.textFile(files.mkString(","))valrdd2 
=sc.union(files.map(sc.textFile(_)))

I don't know about your use-case, Meethu, but it may be worth trying to see if 
reading all the files into one RDD (like rdd1) would perform better in the 
join. (If this is possible in your situation.)




On Tue, Jun 17, 2014 at 6:45 PM, Andrew Or  wrote:

How long does it get stuck for? This is a common sign for the OS thrashing due 
to out of memory exceptions. If you keep it running longer, does it throw an 
error?
>
>
>Depending on how large your other RDD is (and your join operation), memory 
>pressure may or may not be the problem at all. It could be that spilling your 
>shuffles
>to disk is slowing you down (but probably shouldn't hang your application). 
>For the 5 RDDs case, what happens if you set spark.shuffle.spill to false?
>
>
>
>2014-06-17 5:59 GMT-07:00 MEETHU MATHEW :
>
>
>
>>
>> Hi all,
>>
>>
>>I want  to do a recursive leftOuterJoin between an RDD (created from  file) 
>>with 9 million rows(size of the file is 100MB) and 30 other RDDs(created from 
>>30 diff files in each iteration of a loop) varying from 1 to 6 million rows.
>>When I run it for 5 RDDs,its running successfully  in 5 minutes.But when I 
>>increase it to 10 or 30 RDDs its gradually slowing down and finally getting 
>>stuck without showing any warning or error.
>>
>>
>>I am running in standalone mode with 2 workers of 4GB each and a total of 16 
>>cores .
>>
>>
>>Any of you facing similar problems with JOIN  or is it a problem with my 
>>configuration.
>>
>>
>>Thanks & Regards, 
>>Meethu M
>

Re: Spark Streaming Example with CDH5

2014-06-18 Thread Sean Owen
There is nothing special about CDH5 Spark in this regard. CDH 5.0.x has
Spark 0.9.0, and the imminent next release will have 1.0.0 + upstream
patches.

You're simply accessing a class that was not present in 0.9.0, but is
present after that:

https://github.com/apache/spark/commits/master/core/src/main/scala/org/apache/spark/SecurityManager.scala




On Wed, Jun 18, 2014 at 3:14 AM, manas Kar  wrote:

> Hi Spark Gurus,
>  I am trying to compile a spark streaming example with CDH5 and having
> problem compiling it.
> Has anyone created an example spark streaming using CDH5(preferably Spark
> 0.9.1) and would be kind enough to share the build.sbt(.scala) file?(or
> point to their example on github). I know there is a streaming example
>  here
>    but I am looking
> for something that runs with CDH5.
>
>
> My build.scala files looks like given below.
>
>  object Dependency {
> // Versions
> object V {
> val Akka = "2.3.0"
> val scala = "2.10.4"
> val cloudera = "0.9.0-cdh5.0.0"
> }
>
> val sparkCore  = "org.apache.spark" %% "spark-core"% V.cloudera
> val sparkStreaming = "org.apache.spark" %% "spark-streaming" % V.cloudera
>
> resolvers ++= Seq( "cloudera repo" at
> "https://repository.cloudera.com/artifactory/cloudera-repos/";,
>   "haddop repo" at
> "https://repository.cloudera.com/content/repositories/releases/";)
>
> I have also attached the complete build.scala file for sake of
> completeness.
> sbt dist gives the following error:
>  object SecurityManager is not a member of package org.apache.spark
> [error] import org.apache.spark.{SparkConf, SecurityManager}
>
>
> build.scala
> <
> http://apache-spark-user-list.1001560.n3.nabble.com/file/n7796/build.scala
> >
>
>
> Appreciate the great work the spark community is doing. It is by far the
> best thing I have worked on.
>
> ..Manas
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Example-with-CDH5-tp7796.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>