Re: Need help about how hadoop works.

2014-04-23 Thread Prashant Sharma
Prashant Sharma


On Thu, Apr 24, 2014 at 12:15 PM, Carter  wrote:

> Thanks Mayur.
>
> So without Hadoop and any other distributed file systems, by running:
>  val doc = sc.textFile("/home/scalatest.txt",5)
>  doc.count
> we can only get parallelization within the computer where the file is
> loaded, but not the parallelization within the computers in the cluster
> (Spark can not automatically duplicate the file to the other computers in
> the cluster), is this understanding correct? Thank you.
>
>
Spark will not distribute that file for you on other systems, however if
the file("/home/scalatest.txt") is present on the same path on all systems
it will be processed on all nodes. We generally use hdfs which takes care
of this distribution.


>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Need-help-about-how-hadoop-works-tp4638p4734.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Need help about how hadoop works.

2014-04-23 Thread Carter
Thanks Mayur.

So without Hadoop and any other distributed file systems, by running:
 val doc = sc.textFile("/home/scalatest.txt",5)
 doc.count
we can only get parallelization within the computer where the file is
loaded, but not the parallelization within the computers in the cluster
(Spark can not automatically duplicate the file to the other computers in
the cluster), is this understanding correct? Thank you.

 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Need-help-about-how-hadoop-works-tp4638p4734.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Re: how to set spark.executor.memory and heap size

2014-04-23 Thread qinwei






try the complete path


qinwei
 From: wxhsdpDate: 2014-04-24 14:21To: userSubject: Re: how to set 
spark.executor.memory and heap sizethank you, i add setJars, but nothing changes
 
    val conf = new SparkConf()
  .setMaster("spark://127.0.0.1:7077")
  .setAppName("Simple App")
  .set("spark.executor.memory", "1g")
  .setJars(Seq("target/scala-2.10/simple-project_2.10-1.0.jar"))
    val sc = new SparkContext(conf)
 
 
 
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p4732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: how to set spark.executor.memory and heap size

2014-04-23 Thread wxhsdp
thank you, i add setJars, but nothing changes

val conf = new SparkConf()
  .setMaster("spark://127.0.0.1:7077")
  .setAppName("Simple App")
  .set("spark.executor.memory", "1g")
  .setJars(Seq("target/scala-2.10/simple-project_2.10-1.0.jar"))
val sc = new SparkContext(conf)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p4732.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Access Last Element of RDD

2014-04-23 Thread Sai Prasanna
Thanks Guys !


On Thu, Apr 24, 2014 at 11:29 AM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> Also same thing can be done using rdd.top(1)(reverseOrdering)
>
>
>
> On Thu, Apr 24, 2014 at 11:28 AM, Sourav Chandra <
> sourav.chan...@livestream.com> wrote:
>
>> You can use rdd.takeOrdered(1)(reverseOrdrering)
>>
>> reverseOrdering is you Ordering[T] instance where you define the ordering
>> logic. This you have to pass in the method
>>
>>
>>
>> On Thu, Apr 24, 2014 at 11:21 AM, Frank Austin Nothaft <
>> fnoth...@berkeley.edu> wrote:
>>
>>> If you do this, you could simplify to:
>>>
>>> RDD.collect().last
>>>
>>> However, this has the problem of collecting all data to the driver.
>>>
>>> Is your data sorted? If so, you could reverse the sort and take the
>>> first. Alternatively, a hackey implementation might involve a
>>> mapPartitionsWithIndex that returns an empty iterator for all partitions
>>> except for the last. For the last partition, you would filter all elements
>>> except for the last element in your iterator. This should leave one
>>> element, which is your last element.
>>>
>>> Frank Austin Nothaft
>>> fnoth...@berkeley.edu
>>> fnoth...@eecs.berkeley.edu
>>> 202-340-0466
>>>
>>> On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob  wrote:
>>>
>>> This function will return scala List, you can use List's last function
>>> to get the last element.
>>>
>>> For example:
>>>
>>> RDD.take(RDD.count()).last
>>>
>>>
>>> On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna 
>>> wrote:
>>>
 Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.

 I want only to access the last element.


 On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna >>> > wrote:

> Oh ya, Thanks Adnan.
>
>
> On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob wrote:
>
>> You can use following code:
>>
>> RDD.take(RDD.count())
>>
>>
>> On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna <
>> ansaiprasa...@gmail.com> wrote:
>>
>>> Hi All, Some help !
>>> RDD.first or RDD.take(1) gives the first item, is there a straight
>>> forward way to access the last element in a similar way ?
>>>
>>> I coudnt fine a tail/last method for RDD. !!
>>>
>>
>>
>

>>>
>>>
>>
>>
>> --
>>
>> Sourav Chandra
>>
>> Senior Software Engineer
>>
>> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>>
>> sourav.chan...@livestream.com
>>
>> o: +91 80 4121 8723
>>
>> m: +91 988 699 3746
>>
>> skype: sourav.chandra
>>
>> Livestream
>>
>> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
>> Block, Koramangala Industrial Area,
>>
>> Bangalore 560034
>>
>> www.livestream.com
>>
>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>


Re: Access Last Element of RDD

2014-04-23 Thread Sourav Chandra
Also same thing can be done using rdd.top(1)(reverseOrdering)



On Thu, Apr 24, 2014 at 11:28 AM, Sourav Chandra <
sourav.chan...@livestream.com> wrote:

> You can use rdd.takeOrdered(1)(reverseOrdrering)
>
> reverseOrdering is you Ordering[T] instance where you define the ordering
> logic. This you have to pass in the method
>
>
>
> On Thu, Apr 24, 2014 at 11:21 AM, Frank Austin Nothaft <
> fnoth...@berkeley.edu> wrote:
>
>> If you do this, you could simplify to:
>>
>> RDD.collect().last
>>
>> However, this has the problem of collecting all data to the driver.
>>
>> Is your data sorted? If so, you could reverse the sort and take the
>> first. Alternatively, a hackey implementation might involve a
>> mapPartitionsWithIndex that returns an empty iterator for all partitions
>> except for the last. For the last partition, you would filter all elements
>> except for the last element in your iterator. This should leave one
>> element, which is your last element.
>>
>> Frank Austin Nothaft
>> fnoth...@berkeley.edu
>> fnoth...@eecs.berkeley.edu
>> 202-340-0466
>>
>> On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob  wrote:
>>
>> This function will return scala List, you can use List's last function to
>> get the last element.
>>
>> For example:
>>
>> RDD.take(RDD.count()).last
>>
>>
>> On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna 
>> wrote:
>>
>>> Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.
>>>
>>> I want only to access the last element.
>>>
>>>
>>> On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna 
>>> wrote:
>>>
 Oh ya, Thanks Adnan.


 On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob wrote:

> You can use following code:
>
> RDD.take(RDD.count())
>
>
> On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna  > wrote:
>
>> Hi All, Some help !
>> RDD.first or RDD.take(1) gives the first item, is there a straight
>> forward way to access the last element in a similar way ?
>>
>> I coudnt fine a tail/last method for RDD. !!
>>
>
>

>>>
>>
>>
>
>
> --
>
> Sourav Chandra
>
> Senior Software Engineer
>
> · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·
>
> sourav.chan...@livestream.com
>
> o: +91 80 4121 8723
>
> m: +91 988 699 3746
>
> skype: sourav.chandra
>
> Livestream
>
> "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
> Block, Koramangala Industrial Area,
>
> Bangalore 560034
>
> www.livestream.com
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Access Last Element of RDD

2014-04-23 Thread Sourav Chandra
You can use rdd.takeOrdered(1)(reverseOrdrering)

reverseOrdering is you Ordering[T] instance where you define the ordering
logic. This you have to pass in the method



On Thu, Apr 24, 2014 at 11:21 AM, Frank Austin Nothaft <
fnoth...@berkeley.edu> wrote:

> If you do this, you could simplify to:
>
> RDD.collect().last
>
> However, this has the problem of collecting all data to the driver.
>
> Is your data sorted? If so, you could reverse the sort and take the first.
> Alternatively, a hackey implementation might involve a
> mapPartitionsWithIndex that returns an empty iterator for all partitions
> except for the last. For the last partition, you would filter all elements
> except for the last element in your iterator. This should leave one
> element, which is your last element.
>
> Frank Austin Nothaft
> fnoth...@berkeley.edu
> fnoth...@eecs.berkeley.edu
> 202-340-0466
>
> On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob  wrote:
>
> This function will return scala List, you can use List's last function to
> get the last element.
>
> For example:
>
> RDD.take(RDD.count()).last
>
>
> On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna wrote:
>
>> Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.
>>
>> I want only to access the last element.
>>
>>
>> On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna 
>> wrote:
>>
>>> Oh ya, Thanks Adnan.
>>>
>>>
>>> On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob wrote:
>>>
 You can use following code:

 RDD.take(RDD.count())


 On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna 
 wrote:

> Hi All, Some help !
> RDD.first or RDD.take(1) gives the first item, is there a straight
> forward way to access the last element in a similar way ?
>
> I coudnt fine a tail/last method for RDD. !!
>


>>>
>>
>
>


-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Access Last Element of RDD

2014-04-23 Thread Sai Prasanna
What i observe is, this way of computing is very inefficient. It returns
all the elements of the RDD to a List which takes considerable amount of
time.
Then it calculates the last element.

I have a file of size 3 GB in which i ran a lot of aggregate operations
which dint took the time that this take(RDD.count) took.

Is there an efficient way ? My guess is there should be one, since its a
basic operation.


On Thu, Apr 24, 2014 at 11:14 AM, Adnan Yaqoob  wrote:

> This function will return scala List, you can use List's last function to
> get the last element.
>
> For example:
>
> RDD.take(RDD.count()).last
>
>
> On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna wrote:
>
>> Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.
>>
>> I want only to access the last element.
>>
>>
>> On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna 
>> wrote:
>>
>>> Oh ya, Thanks Adnan.
>>>
>>>
>>> On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob wrote:
>>>
 You can use following code:

 RDD.take(RDD.count())


 On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna 
 wrote:

> Hi All, Some help !
> RDD.first or RDD.take(1) gives the first item, is there a straight
> forward way to access the last element in a similar way ?
>
> I coudnt fine a tail/last method for RDD. !!
>


>>>
>>
>


Re: Access Last Element of RDD

2014-04-23 Thread Frank Austin Nothaft
If you do this, you could simplify to:

RDD.collect().last

However, this has the problem of collecting all data to the driver.

Is your data sorted? If so, you could reverse the sort and take the first. 
Alternatively, a hackey implementation might involve a mapPartitionsWithIndex 
that returns an empty iterator for all partitions except for the last. For the 
last partition, you would filter all elements except for the last element in 
your iterator. This should leave one element, which is your last element.

Frank Austin Nothaft
fnoth...@berkeley.edu
fnoth...@eecs.berkeley.edu
202-340-0466

On Apr 23, 2014, at 10:44 PM, Adnan Yaqoob  wrote:

> This function will return scala List, you can use List's last function to get 
> the last element.
> 
> For example:
> 
> RDD.take(RDD.count()).last
> 
> 
> On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna  
> wrote:
> Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.
> 
> I want only to access the last element.
> 
> 
> On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna  
> wrote:
> Oh ya, Thanks Adnan.
> 
> 
> On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob  wrote:
> You can use following code:
> 
> RDD.take(RDD.count())
> 
> 
> On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna  wrote:
> Hi All, Some help !
> RDD.first or RDD.take(1) gives the first item, is there a straight forward 
> way to access the last element in a similar way ?
> 
> I coudnt fine a tail/last method for RDD. !!
> 
> 
> 
> 



Re: Access Last Element of RDD

2014-04-23 Thread Adnan Yaqoob
This function will return scala List, you can use List's last function to
get the last element.

For example:

RDD.take(RDD.count()).last


On Thu, Apr 24, 2014 at 10:28 AM, Sai Prasanna wrote:

> Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.
>
> I want only to access the last element.
>
>
> On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna wrote:
>
>> Oh ya, Thanks Adnan.
>>
>>
>> On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob wrote:
>>
>>> You can use following code:
>>>
>>> RDD.take(RDD.count())
>>>
>>>
>>> On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna 
>>> wrote:
>>>
 Hi All, Some help !
 RDD.first or RDD.take(1) gives the first item, is there a straight
 forward way to access the last element in a similar way ?

 I coudnt fine a tail/last method for RDD. !!

>>>
>>>
>>
>


Re: Access Last Element of RDD

2014-04-23 Thread Sai Prasanna
Adnan, but RDD.take(RDD.count()) returns all the elements of the RDD.

I want only to access the last element.


On Thu, Apr 24, 2014 at 10:33 AM, Sai Prasanna wrote:

> Oh ya, Thanks Adnan.
>
>
> On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob  wrote:
>
>> You can use following code:
>>
>> RDD.take(RDD.count())
>>
>>
>> On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna wrote:
>>
>>> Hi All, Some help !
>>> RDD.first or RDD.take(1) gives the first item, is there a straight
>>> forward way to access the last element in a similar way ?
>>>
>>> I coudnt fine a tail/last method for RDD. !!
>>>
>>
>>
>


Re: Access Last Element of RDD

2014-04-23 Thread Sai Prasanna
Oh ya, Thanks Adnan.


On Thu, Apr 24, 2014 at 10:30 AM, Adnan Yaqoob  wrote:

> You can use following code:
>
> RDD.take(RDD.count())
>
>
> On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna wrote:
>
>> Hi All, Some help !
>> RDD.first or RDD.take(1) gives the first item, is there a straight
>> forward way to access the last element in a similar way ?
>>
>> I coudnt fine a tail/last method for RDD. !!
>>
>
>


Re: Access Last Element of RDD

2014-04-23 Thread Adnan Yaqoob
You can use following code:

RDD.take(RDD.count())


On Thu, Apr 24, 2014 at 9:51 AM, Sai Prasanna wrote:

> Hi All, Some help !
> RDD.first or RDD.take(1) gives the first item, is there a straight forward
> way to access the last element in a similar way ?
>
> I coudnt fine a tail/last method for RDD. !!
>


Access Last Element of RDD

2014-04-23 Thread Sai Prasanna
Hi All, Some help !
RDD.first or RDD.take(1) gives the first item, is there a straight forward
way to access the last element in a similar way ?

I coudnt fine a tail/last method for RDD. !!


Re: how to set spark.executor.memory and heap size

2014-04-23 Thread Adnan Yaqoob
When I was testing spark, I faced this issue, this issue is not related to
memory shortage, It is because your configurations are not correct. Try to
pass you current Jar to to the SparkContext with SparkConf's setJars
function and try again.

On Thu, Apr 24, 2014 at 8:38 AM, wxhsdp  wrote:

> by the way, codes run ok in spark shell
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p4720.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: how to set spark.executor.memory and heap size

2014-04-23 Thread wxhsdp
by the way, codes run ok in spark shell



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-set-spark-executor-memory-and-heap-size-tp4719p4720.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


how to set spark.executor.memory and heap size

2014-04-23 Thread wxhsdp
hi
i'am testing SimpleApp.scala in standalone mode with only one pc, so i have
one master and one local worker on the same pc

with rather small input file size(4.5K), i have got the
java.lang.OutOfMemoryError: Java heap space error

here's my settings:
spark-env.sh:
export SPARK_MASTER_IP="127.0.0.1"
export SPARK_WORKER_CORES=1
export SPARK_WORKER_MEMORY=2g
export SPARK_JAVA_OPTS+=" -Xms512m -Xmx512m " //(1)

SimpleApp.scala:
val conf = new SparkConf()
  .setMaster("spark://127.0.0.1:7077")
  .setAppName("Simple App")
  .set("spark.executor.memory", "1g")  //(2)
val sc = new SparkContext(conf)

sbt:
SBT_OPTS="-Xms512M -Xmx512M" //(3)
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"

i'am confused with the above (1)(2)(3) settings, and tried several different
options, but all failed
with java.lang.OutOfMemoryError:(

what's the difference between JVM heap size and spark.executor.memory and
how to set them?

i've read some docs and still cannot fully understand

spark.executor.memory: Amount of memory to use per executor process, in the
same format as JVM memory strings (e.g. 512m, 2g).

spark.storage.memoryFraction: Fraction of Java heap to use for Spark's
memory cache.

spark.storage.memoryFraction = 0.6 * spark.executor.memory

is that mean spark.executor.memory = JVM heap size?

here's the logs:
[info] Running SimpleApp 
14/04/24 10:59:41 WARN util.Utils: Your hostname, ubuntu resolves to a
loopback address: 127.0.1.1; using 192.168.0.113 instead (on interface eth0)
14/04/24 10:59:41 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to
another address
14/04/24 10:59:42 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/04/24 10:59:42 INFO Remoting: Starting remoting
14/04/24 10:59:42 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@ubuntu.local:46864]
14/04/24 10:59:42 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@ubuntu.local:46864]
14/04/24 10:59:42 INFO spark.SparkEnv: Registering BlockManagerMaster
14/04/24 10:59:42 INFO storage.DiskBlockManager: Created local directory at
/tmp/spark-local-20140424105942-362c
14/04/24 10:59:42 INFO storage.MemoryStore: MemoryStore started with
capacity 297.0 MB.
14/04/24 10:59:42 INFO network.ConnectionManager: Bound socket to port 34146
with id = ConnectionManagerId(ubuntu.local,34146)
14/04/24 10:59:42 INFO storage.BlockManagerMaster: Trying to register
BlockManager
14/04/24 10:59:42 INFO storage.BlockManagerMasterActor$BlockManagerInfo:
Registering block manager ubuntu.local:34146 with 297.0 MB RAM
14/04/24 10:59:42 INFO storage.BlockManagerMaster: Registered BlockManager
14/04/24 10:59:43 INFO spark.HttpServer: Starting HTTP Server
14/04/24 10:59:43 INFO server.Server: jetty-7.6.8.v20121106
14/04/24 10:59:43 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:58936
14/04/24 10:59:43 INFO broadcast.HttpBroadcast: Broadcast server started at
http://192.168.0.113:58936
14/04/24 10:59:43 INFO spark.SparkEnv: Registering MapOutputTracker
14/04/24 10:59:43 INFO spark.HttpFileServer: HTTP File server directory is
/tmp/spark-ce78fc2c-097d-4053-991d-b6bf140d6c33
14/04/24 10:59:43 INFO spark.HttpServer: Starting HTTP Server
14/04/24 10:59:43 INFO server.Server: jetty-7.6.8.v20121106
14/04/24 10:59:43 INFO server.AbstractConnector: Started
SocketConnector@0.0.0.0:56414
14/04/24 10:59:43 INFO server.Server: jetty-7.6.8.v20121106
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/storage,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/stage,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages/pool,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/stages,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/environment,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/executors,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/metrics/json,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/static,null}
14/04/24 10:59:43 INFO handler.ContextHandler: started
o.e.j.s.h.ContextHandler{/,null}
14/04/24 10:59:43 INFO server.AbstractConnector: Started
SelectChannelConnector@0.0.0.0:4040
14/04/24 10:59:43 INFO ui.SparkUI: Started Spark Web UI at
http://ubuntu.local:4040
14/04/24 10:59:43 INFO client.AppClient$ClientActor: Connecting to master
spark://127.0.0.1:7077...
14/04/24 10:59:44 INFO cluster.SparkDeploySchedulerBackend: Connected to
Spark cluster with app ID app-20140424105944-0001
14/04/24 10:59:44 INFO client.AppClient$ClientActor: Executor added:
app-20140424105944-0001/0 on worker-20140424105022-ubuntu.local-40058
(ubuntu.local:40058) with 1 cores
14/04/24 10:59:44 INFO cluster.SparkDeploySchedulerBac

Re: about rdd.filter()

2014-04-23 Thread randylu
@Cheng Lian-2, Sourav Chandra,  thanks very much.
   You are right! The situation just like what you say. so nice !



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657p4718.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: about rdd.filter()

2014-04-23 Thread randylu

14/04/23 17:17:40 INFO DAGScheduler: Failed to run collect at
SparkListDocByTopic.scala:407
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:40)
at
org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: org.apache.spark.SparkException: Job aborted due to stage
failure: Task not serializable: java.io.NotSerializableExceptio
n: SparkListDocByTopic$EnvParameter
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler
.scala:1013)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler
.scala:1002)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler
.scala:1000)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1000)
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:77
2)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scal
a:892)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:889)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16$$anonfun$apply$1.apply(DAGScheduler.scala:889)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:889)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$16.apply(DAGScheduler.scala:888)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:888)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:592)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:143)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657p4717.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: GraphX: Help understanding the limitations of Pregel

2014-04-23 Thread Ankur Dave
If you need access to all message values in vprog, there's nothing wrong
with building up an array in mergeMsg (option #1). This is what
org.apache.spark.graphx.lib.TriangleCount does, though with sets instead of
arrays. There will be a performance penalty because of the communication,
but it sounds like that's unavoidable here.

Ankur 

On Wed, Apr 23, 2014 at 8:20 PM, Ryan Compton  wrote:

> 1. a hacky mergeMsg (i.e. combine a,b -> Array(a,b) and then do the
> median in vprog)
>


Re: GraphX: Help understanding the limitations of Pregel

2014-04-23 Thread Ryan Compton
Whoops, I should have mentioned that it's a multivariate median (cf
http://www.pnas.org/content/97/4/1423.full.pdf ). It's easy to compute
when all the values are accessible at once. I'm not sure it's possible
with a combiner. So, I guess the question should be: "Can I use
GraphX's Pregel without a combiner?"

On Wed, Apr 23, 2014 at 7:01 PM, Tom Vacek  wrote:
> Here are some out-of-the-box ideas:  If the elements lie in a fairly small
> range and/or you're willing to work with limited precision, you could use
> counting sort.  Moreover, you could iteratively find the median using
> bisection, which would be associative and commutative.  It's easy to think
> of improvements that would make this approach give a reasonable answer in a
> few iterations.  I have no idea about mixing algorithmic iterations with
> median-finding iterations.
>
>
> On Wed, Apr 23, 2014 at 8:20 PM, Ryan Compton 
> wrote:
>>
>> I'm trying shoehorn a label propagation-ish algorithm into GraphX. I
>> need to update each vertex with the median value of their neighbors.
>> Unlike PageRank, which updates each vertex with the mean of their
>> neighbors, I don't have a simple commutative and associative function
>> to use for mergeMsg.
>>
>> What are my options? It looks like I can choose between:
>>
>> 1. a hacky mergeMsg (i.e. combine a,b -> Array(a,b) and then do the
>> median in vprog)
>> 2. collectNeighbors and then median
>> 3. ignore GraphX and just do the whole thing with joins (which I
>> actually got working, but its slow)
>>
>> Is there another possibility that I'm missing?
>
>


Re: GraphX: Help understanding the limitations of Pregel

2014-04-23 Thread Tom Vacek
Here are some out-of-the-box ideas:  If the elements lie in a fairly small
range and/or you're willing to work with limited precision, you could use
counting sort.  Moreover, you could iteratively find the median using
bisection, which would be associative and commutative.  It's easy to think
of improvements that would make this approach give a reasonable answer in a
few iterations.  I have no idea about mixing algorithmic iterations with
median-finding iterations.


On Wed, Apr 23, 2014 at 8:20 PM, Ryan Compton wrote:

> I'm trying shoehorn a label propagation-ish algorithm into GraphX. I
> need to update each vertex with the median value of their neighbors.
> Unlike PageRank, which updates each vertex with the mean of their
> neighbors, I don't have a simple commutative and associative function
> to use for mergeMsg.
>
> What are my options? It looks like I can choose between:
>
> 1. a hacky mergeMsg (i.e. combine a,b -> Array(a,b) and then do the
> median in vprog)
> 2. collectNeighbors and then median
> 3. ignore GraphX and just do the whole thing with joins (which I
> actually got working, but its slow)
>
> Is there another possibility that I'm missing?
>


GraphX: Help understanding the limitations of Pregel

2014-04-23 Thread Ryan Compton
I'm trying shoehorn a label propagation-ish algorithm into GraphX. I
need to update each vertex with the median value of their neighbors.
Unlike PageRank, which updates each vertex with the mean of their
neighbors, I don't have a simple commutative and associative function
to use for mergeMsg.

What are my options? It looks like I can choose between:

1. a hacky mergeMsg (i.e. combine a,b -> Array(a,b) and then do the
median in vprog)
2. collectNeighbors and then median
3. ignore GraphX and just do the whole thing with joins (which I
actually got working, but its slow)

Is there another possibility that I'm missing?


RE:

2014-04-23 Thread Buttler, David
This sounds like a configuration issue.  Either you have not set the MASTER 
correctly, or possibly another process is using up all of the cores
Dave

From: ge ko [mailto:koenig@gmail.com]
Sent: Sunday, April 13, 2014 12:51 PM
To: user@spark.apache.org
Subject:


Hi,

I'm still going to start working with Spark and installed the parcels in our 
CDH5 GA cluster.



Master: hadoop-pg-5.cluster, Worker: hadoop-pg-7.cluster

Like some advices told me to use FQDN, the settings above sound reasonable for 
me .



Both daemons are running, Master-Web-UI shows the connected worker, and the log 
entries show:

master:

2014-04-13 21:26:40,641 INFO Remoting: Starting remoting
2014-04-13 21:26:40,930 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkMaster@hadoop-pg-5.cluster:7077]
2014-04-13 21:26:41,356 INFO org.apache.spark.deploy.master.Master: Starting 
Spark master at spark://hadoop-pg-5.cluster:7077
...

2014-04-13 21:26:41,439 INFO org.eclipse.jetty.server.AbstractConnector: 
Started 
SelectChannelConnector@0.0.0.0:18080
2014-04-13 21:26:41,441 INFO org.apache.spark.deploy.master.ui.MasterWebUI: 
Started Master web UI at http://hadoop-pg-5.cluster:18080
2014-04-13 21:26:41,476 INFO org.apache.spark.deploy.master.Master: I have been 
elected leader! New state: ALIVE

2014-04-13 21:27:40,319 INFO org.apache.spark.deploy.master.Master: Registering 
worker hadoop-pg-5.cluster:7078 with 2 cores, 64.0 MB RAM



worker:

2014-04-13 21:27:39,037 INFO akka.event.slf4j.Slf4jLogger: Slf4jLogger started
2014-04-13 21:27:39,136 INFO Remoting: Starting remoting
2014-04-13 21:27:39,413 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://sparkWorker@hadoop-pg-7.cluster:7078]
2014-04-13 21:27:39,706 INFO org.apache.spark.deploy.worker.Worker: Starting 
Spark worker hadoop-pg-7.cluster:7078 with 2 cores, 64.0 MB RAM
2014-04-13 21:27:39,708 INFO org.apache.spark.deploy.worker.Worker: Spark home: 
/opt/cloudera/parcels/CDH-5.0.0-1.cdh5.0.0.p0.47/lib/spark
...

2014-04-13 21:27:39,888 INFO org.eclipse.jetty.server.AbstractConnector: 
Started 
SelectChannelConnector@0.0.0.0:18081
2014-04-13 21:27:39,889 INFO org.apache.spark.deploy.worker.ui.WorkerWebUI: 
Started Worker web UI at http://hadoop-pg-7.cluster:18081
2014-04-13 21:27:39,890 INFO org.apache.spark.deploy.worker.Worker: Connecting 
to master spark://hadoop-pg-5.cluster:7077...
2014-04-13 21:27:40,360 INFO org.apache.spark.deploy.worker.Worker: 
Successfully registered with master spark://hadoop-pg-5.cluster:7077



Looks good, so far.



Now I want to execute the python pi example by executing (on the worker):

cd /opt/cloudera/parcels/CDH/lib/spark && ./bin/pyspark ./python/examples/pi.py 
spark://hadoop-pg-5.cluster:7077



Here the strange thing happens, the script doesn't get executed, it hangs 
(repeating this output forever) at :



14/04/13 21:31:03 WARN TaskSchedulerImpl: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient memory
14/04/13 21:31:18 WARN TaskSchedulerImpl: Initial job has not accepted any 
resources; check your cluster UI to ensure that workers are registered and have 
sufficient memory



The whole log is:





14/04/13 21:30:44 INFO Slf4jLogger: Slf4jLogger started
14/04/13 21:30:45 INFO Remoting: Starting remoting
14/04/13 21:30:45 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://spark@hadoop-pg-7.cluster:50601]
14/04/13 21:30:45 INFO Remoting: Remoting now listens on addresses: 
[akka.tcp://spark@hadoop-pg-7.cluster:50601]
14/04/13 21:30:45 INFO SparkEnv: Registering BlockManagerMaster
14/04/13 21:30:45 INFO DiskBlockManager: Created local directory at 
/tmp/spark-local-20140413213045-acec
14/04/13 21:30:45 INFO MemoryStore: MemoryStore started with capacity 294.9 MB.
14/04/13 21:30:45 INFO ConnectionManager: Bound socket to port 57506 with id = 
ConnectionManagerId(hadoop-pg-7.cluster,57506)
14/04/13 21:30:45 INFO BlockManagerMaster: Trying to register BlockManager
14/04/13 21:30:45 INFO BlockManagerMasterActor$BlockManagerInfo: Registering 
block manager hadoop-pg-7.cluster:57506 with 294.9 MB RAM
14/04/13 21:30:45 INFO BlockManagerMaster: Registered BlockManager
14/04/13 21:30:45 INFO HttpServer: Starting HTTP Server
14/04/13 21:30:45 INFO HttpBroadcast: Broadcast server started at 
http://10.147.210.7:51224
14/04/13 21:30:45 INFO SparkEnv: Registering MapOutputTracker
14/04/13 21:30:45 INFO HttpFileServer: HTTP File server directory is 
/tmp/spark-f9ab98c8-2adf-460a-9099-6dc07c7dc89f
14/04/13 21:30:45 INFO HttpServer: Starting HTTP Server
14/04/13 21:30:46 INFO SparkUI: Started Spark Web UI at 
http://hadoop-pg-7.cluster:4040
14/04/13 21:30:46 INFO AppClient$ClientActor: Connecting to master 
spark://hadoop-pg-5.cluster:7077...
14/04/13 21:30:47 INFO SparkDeploySchedulerBackend: Connected to Spark cluster 
with ap

Re: Failed to run count?

2014-04-23 Thread Xiangrui Meng
Which spark version are you using? Could you also include the worker
logs? -Xiangrui

On Wed, Apr 23, 2014 at 3:19 PM, Ian Ferreira  wrote:
> I am getting this cryptic  error running LinearRegressionwithSGD
>
> Data sample
> LabeledPoint(39.0, [144.0, 1521.0, 20736.0, 59319.0, 2985984.0])
>
> 14/04/23 15:15:34 INFO SparkContext: Starting job: first at
> GeneralizedLinearAlgorithm.scala:121
> 14/04/23 15:15:34 INFO DAGScheduler: Got job 2 (first at
> GeneralizedLinearAlgorithm.scala:121) with 1 output partitions
> (allowLocal=true)
> 14/04/23 15:15:34 INFO DAGScheduler: Final stage: Stage 2 (first at
> GeneralizedLinearAlgorithm.scala:121)
> 14/04/23 15:15:34 INFO DAGScheduler: Parents of final stage: List()
> 14/04/23 15:15:34 INFO DAGScheduler: Missing parents: List()
> 14/04/23 15:15:34 INFO DAGScheduler: Computing the requested partition
> locally
> 14/04/23 15:15:34 INFO HadoopRDD: Input split:
> file:/Users/iferreira/data/test.csv:0+104
> 14/04/23 15:15:34 INFO SparkContext: Job finished: first at
> GeneralizedLinearAlgorithm.scala:121, took 0.030158 s
> 14/04/23 15:15:34 INFO SparkContext: Starting job: count at
> GradientDescent.scala:137
> 14/04/23 15:15:34 INFO DAGScheduler: Got job 3 (count at
> GradientDescent.scala:137) with 2 output partitions (allowLocal=false)
> 14/04/23 15:15:34 INFO DAGScheduler: Final stage: Stage 3 (count at
> GradientDescent.scala:137)
> 14/04/23 15:15:34 INFO DAGScheduler: Parents of final stage: List()
> 14/04/23 15:15:34 INFO DAGScheduler: Missing parents: List()
> 14/04/23 15:15:34 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[7] at map
> at GeneralizedLinearAlgorithm.scala:139), which has no missing parents
> 14/04/23 15:15:35 INFO DAGScheduler: Failed to run count at
> GradientDescent.scala:137
>
> Any clues what may trigger this error, overflow?
>
>


GraphX, Kryo and BoundedPriorityQueue?

2014-04-23 Thread Ryan Compton
For me, PageRank fails when I use Kryo (works fine if I don't). I
found the same problem reported here:
https://groups.google.com/forum/#!topic/spark-users/unngi3JdRk8 .

Has this been resolved?

I'm not launching code from spark-shell. I tried registering
GraphKryoRegistrator (instead of my own classes), I still get the
below error:

2014-04-23 15:17:35 ERROR OneForOneStrategy:66 -
scala.collection.immutable.$colon$colon cannot be cast to
org.apache.spark.util.BoundedPriorityQueue
java.lang.ClassCastException: scala.collection.immutable.$colon$colon
cannot be cast to org.apache.spark.util.BoundedPriorityQueue
at org.apache.spark.rdd.RDD$$anonfun$top$2.apply(RDD.scala:879)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:677)
at org.apache.spark.rdd.RDD$$anonfun$6.apply(RDD.scala:674)
at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:846)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:601)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


Failed to run count?

2014-04-23 Thread Ian Ferreira
I am getting this cryptic  error running LinearRegressionwithSGD

Data sample
LabeledPoint(39.0, [144.0, 1521.0, 20736.0, 59319.0, 2985984.0])

14/04/23 15:15:34 INFO SparkContext: Starting job: first at
GeneralizedLinearAlgorithm.scala:121
14/04/23 15:15:34 INFO DAGScheduler: Got job 2 (first at
GeneralizedLinearAlgorithm.scala:121) with 1 output partitions
(allowLocal=true)
14/04/23 15:15:34 INFO DAGScheduler: Final stage: Stage 2 (first at
GeneralizedLinearAlgorithm.scala:121)
14/04/23 15:15:34 INFO DAGScheduler: Parents of final stage: List()
14/04/23 15:15:34 INFO DAGScheduler: Missing parents: List()
14/04/23 15:15:34 INFO DAGScheduler: Computing the requested partition
locally
14/04/23 15:15:34 INFO HadoopRDD: Input split:
file:/Users/iferreira/data/test.csv:0+104
14/04/23 15:15:34 INFO SparkContext: Job finished: first at
GeneralizedLinearAlgorithm.scala:121, took 0.030158 s
14/04/23 15:15:34 INFO SparkContext: Starting job: count at
GradientDescent.scala:137
14/04/23 15:15:34 INFO DAGScheduler: Got job 3 (count at
GradientDescent.scala:137) with 2 output partitions (allowLocal=false)
14/04/23 15:15:34 INFO DAGScheduler: Final stage: Stage 3 (count at
GradientDescent.scala:137)
14/04/23 15:15:34 INFO DAGScheduler: Parents of final stage: List()
14/04/23 15:15:34 INFO DAGScheduler: Missing parents: List()
14/04/23 15:15:34 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[7] at map
at GeneralizedLinearAlgorithm.scala:139), which has no missing parents
14/04/23 15:15:35 INFO DAGScheduler: Failed to run count at
GradientDescent.scala:137

Any clues what may trigger this error, overflow?






Re: AmpCamp exercise in a local environment

2014-04-23 Thread Nabeel Memon
Thanks a lot Arpit. It's really helpful.


On Fri, Apr 18, 2014 at 4:24 AM, Arpit Tak wrote:

> Download Cloudera VM from here.
>
>
> https://drive.google.com/file/d/0B7zn-Mmft-XcdTZPLXltUjJyeUE/edit?usp=sharing
>
> Regards,
> Arpit Tak
>
>
> On Fri, Apr 18, 2014 at 1:20 PM, Arpit Tak wrote:
>
>> HI Nabeel,
>>
>> I have a cloudera VM , It has both spark and shark installed in it.
>> You can download and play around with it . i also have some sample data in
>> hdfs and some table .
>>
>> You can try out those examples. How to use it ..(instructions are in
>> docs...).
>>
>>
>> https://drive.google.com/file/d/0B0Q4Le4DZj5iSndIcFBfQlcxM1NlV3RNN3YzU1dOT1ZjZHJJ/edit?usp=sharing
>>
>> But for AmpCamp-exercises , you need ec2 only to get wikidata on your
>> hdfs. For that I have uploaded file(50Mb) . Just download it and put on
>> hdfs .. and you can work around these exercises...
>>
>>
>> https://drive.google.com/a/mobipulse.in/uc?id=0B0Q4Le4DZj5iNUdSZXpFTUJEU0E&export=download
>>
>> You will love it...
>>
>> Regards,
>> Arpit Tak
>>
>>
>> On Tue, Apr 15, 2014 at 4:28 AM, Nabeel Memon  wrote:
>>
>>> Hi. I found AmpCamp exercises as a nice way to get started with spark.
>>> However they require amazon ec2 access. Has anyone put together any VM or
>>> docker scripts to have the same environment locally to work out those labs?
>>>
>>> It'll be really helpful. Thanks.
>>>
>>
>>
>


Re: Pig on Spark

2014-04-23 Thread Mayur Rustagi
Right now UDF is not working. Its in the top list though. You should be
able to soon :)
Are thr any other functionality of pig you use often apart from the usual
suspects??

Existing Java MR jobs would be a easier move. are these cascading jobs or
single map reduce jobs. If single then you should be able to,  write a
scala wrapper code code to call map & reduce functions with some magic &
let your core code be. Would be interesting to see an actual example & get
it to work.

Regards
Mayur


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Apr 24, 2014 at 2:46 AM, suman bharadwaj wrote:

> We currently are in the process of converting PIG and Java map reduce jobs
> to SPARK jobs. And we have written couple of PIG UDFs as well. Hence was
> checking if we can leverage SPORK without converting to SPARK jobs.
>
> And is there any way I can port my existing Java MR jobs to SPARK ?
> I know this thread has a different subject, let me know if need to ask
> this question in separate thread.
>
> Thanks in advance.
>
>
> On Thu, Apr 24, 2014 at 2:13 AM, Mayur Rustagi wrote:
>
>> UDF
>> Generate
>> & many many more are not working :)
>>
>> Several of them work. Joins, filters, group by etc.
>> I am translating the ones we need, would be happy to get help on others.
>> Will host a jira to track them if you are intersted.
>>
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Thu, Apr 24, 2014 at 2:10 AM, suman bharadwaj wrote:
>>
>>> Are all the features available in PIG working in SPORK ?? Like for eg:
>>> UDFs ?
>>>
>>> Thanks.
>>>
>>>
>>> On Thu, Apr 24, 2014 at 1:54 AM, Mayur Rustagi 
>>> wrote:
>>>
 Thr are two benefits I get as of now
 1. Most of the time a lot of customers dont want the full power but
 they want something dead simple with which they can do dsl. They end up
 using Hive for a lot of ETL just cause its SQL & they understand it. Pig is
 close & wraps up a lot of framework level semantics away from the user &
 lets him focus on data flow
 2. Some have codebases in Pig already & are just looking to do it
 faster. I am yet to benchmark that on Pig on spark.

 I agree that pig on spark cannot solve a lot problems but it can solve
 some without forcing the end customer to do anything even close to coding,
 I believe thr is quite some value in making Spark accessible to larger
 group of audience.
 End of the day to each his own :)

 Regards
 Mayur


 Mayur Rustagi
 Ph: +1 (760) 203 3257
 http://www.sigmoidanalytics.com
 @mayur_rustagi 



 On Thu, Apr 24, 2014 at 1:24 AM, Bharath Mundlapudi <
 mundlap...@gmail.com> wrote:

> This seems like an interesting question.
>
> I love Apache Pig. It is so natural and the language flows with nice
> syntax.
>
> While I was at Yahoo! in core Hadoop Engineering, I have used Pig a
> lot for analytics and provided feedback to Pig Team to do much more
> functionality when it was at version 0.7. Lots of new functionality got
> offered now
> .
> End of the day, Pig is a DSL for data flows. There will be always gaps
> and enhancements. I was often thought is DSL right way to solve data flow
> problems? May be not, we need complete language construct. We may have
> found the answer - Scala. With Scala's dynamic compilation, we can write
> much power constructs than any DSL can provide.
>
> If I am a new organization and beginning to choose, I would go with
> Scala.
>
> Here is the example:
>
> #!/bin/sh
> exec scala "$0" "$@"
> !#
> YOUR DSL GOES HERE BUT IN SCALA!
>
> You have DSL like scripting, functional and complete language power!
> If we can improve first 3 lines, here you go, you have most powerful DSL 
> to
> solve data problems.
>
> -Bharath
>
>
>
>
>
> On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng wrote:
>
>> Hi Sameer,
>>
>> Lin (cc'ed) could also give you some updates about Pig on Spark
>> development on her side.
>>
>> Best,
>> Xiangrui
>>
>> On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak 
>> wrote:
>> > Hi Mayur,
>> > We are planning to upgrade our distribution MR1> MR2 (YARN) and the
>> goal is
>> > to get SPROK set up next month. I will keep you posted. Can you
>> please keep
>> > me informed about your progress as well.
>> >
>> > 
>> > From: mayur.rust...@gmail.com
>> > Date: Mon, 10 Mar 2014 11:47:56 -0700
>> >
>> > Subject: Re: Pig on Spark
>> > To: user@spark.apache.org
>> >
>> >
>> > Hi Sameer,
>> > Did you make any progress on this. My t

Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread Parviz Deyhim
it means you're out of disk space. Check to see if you have enough free
disk space left your node(s).


On Wed, Apr 23, 2014 at 2:08 PM, jaeholee  wrote:

> After doing that, I ran my code once with a smaller example, and it worked.
> But ever since then, I get the "No space left on device" message for the
> same sample, even if I re-start the master...
>
> ERROR TaskSetManager: Task 29.0:20 failed 4 times; aborting job
> org.apache.spark.SparkException: Job aborted: Task 29.0:20 failed 4 times
> (most recent failure: Exception failure: java.io.IOException: No space left
> on device)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4699.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: Pig on Spark

2014-04-23 Thread suman bharadwaj
We currently are in the process of converting PIG and Java map reduce jobs
to SPARK jobs. And we have written couple of PIG UDFs as well. Hence was
checking if we can leverage SPORK without converting to SPARK jobs.

And is there any way I can port my existing Java MR jobs to SPARK ?
I know this thread has a different subject, let me know if need to ask this
question in separate thread.

Thanks in advance.


On Thu, Apr 24, 2014 at 2:13 AM, Mayur Rustagi wrote:

> UDF
> Generate
> & many many more are not working :)
>
> Several of them work. Joins, filters, group by etc.
> I am translating the ones we need, would be happy to get help on others.
> Will host a jira to track them if you are intersted.
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Thu, Apr 24, 2014 at 2:10 AM, suman bharadwaj wrote:
>
>> Are all the features available in PIG working in SPORK ?? Like for eg:
>> UDFs ?
>>
>> Thanks.
>>
>>
>> On Thu, Apr 24, 2014 at 1:54 AM, Mayur Rustagi 
>> wrote:
>>
>>> Thr are two benefits I get as of now
>>> 1. Most of the time a lot of customers dont want the full power but they
>>> want something dead simple with which they can do dsl. They end up using
>>> Hive for a lot of ETL just cause its SQL & they understand it. Pig is close
>>> & wraps up a lot of framework level semantics away from the user & lets him
>>> focus on data flow
>>> 2. Some have codebases in Pig already & are just looking to do it
>>> faster. I am yet to benchmark that on Pig on spark.
>>>
>>> I agree that pig on spark cannot solve a lot problems but it can solve
>>> some without forcing the end customer to do anything even close to coding,
>>> I believe thr is quite some value in making Spark accessible to larger
>>> group of audience.
>>> End of the day to each his own :)
>>>
>>> Regards
>>> Mayur
>>>
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>> @mayur_rustagi 
>>>
>>>
>>>
>>> On Thu, Apr 24, 2014 at 1:24 AM, Bharath Mundlapudi <
>>> mundlap...@gmail.com> wrote:
>>>
 This seems like an interesting question.

 I love Apache Pig. It is so natural and the language flows with nice
 syntax.

 While I was at Yahoo! in core Hadoop Engineering, I have used Pig a lot
 for analytics and provided feedback to Pig Team to do much more
 functionality when it was at version 0.7. Lots of new functionality got
 offered now
 .
 End of the day, Pig is a DSL for data flows. There will be always gaps
 and enhancements. I was often thought is DSL right way to solve data flow
 problems? May be not, we need complete language construct. We may have
 found the answer - Scala. With Scala's dynamic compilation, we can write
 much power constructs than any DSL can provide.

 If I am a new organization and beginning to choose, I would go with
 Scala.

 Here is the example:

 #!/bin/sh
 exec scala "$0" "$@"
 !#
 YOUR DSL GOES HERE BUT IN SCALA!

 You have DSL like scripting, functional and complete language power! If
 we can improve first 3 lines, here you go, you have most powerful DSL to
 solve data problems.

 -Bharath





 On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng wrote:

> Hi Sameer,
>
> Lin (cc'ed) could also give you some updates about Pig on Spark
> development on her side.
>
> Best,
> Xiangrui
>
> On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak 
> wrote:
> > Hi Mayur,
> > We are planning to upgrade our distribution MR1> MR2 (YARN) and the
> goal is
> > to get SPROK set up next month. I will keep you posted. Can you
> please keep
> > me informed about your progress as well.
> >
> > 
> > From: mayur.rust...@gmail.com
> > Date: Mon, 10 Mar 2014 11:47:56 -0700
> >
> > Subject: Re: Pig on Spark
> > To: user@spark.apache.org
> >
> >
> > Hi Sameer,
> > Did you make any progress on this. My team is also trying it out
> would love
> > to know some detail so progress.
> >
> > Mayur Rustagi
> > Ph: +1 (760) 203 3257
> > http://www.sigmoidanalytics.com
> > @mayur_rustagi
> >
> >
> >
> > On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak 
> wrote:
> >
> > Hi Aniket,
> > Many thanks! I will check this out.
> >
> > 
> > Date: Thu, 6 Mar 2014 13:46:50 -0800
> > Subject: Re: Pig on Spark
> > From: aniket...@gmail.com
> > To: user@spark.apache.org; tgraves...@yahoo.com
> >
> >
> > There is some work to make this work on yarn at
> > https://github.com/aniket486/pig. (So, compile pig with ant
> > -Dhadoopversion=23)
> >
> > You can look at
> https://github.com/aniket486/pi

Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread jaeholee
So if I am using GraphX on Spark and I created a graph, which gets called a
lot later, do I want to cache graph? Or do I want to cache the vertices and
edges (actual data) that I use to create the graph?

e.g.
val graph = Graph(vertices, edges)

graph.blahblahblah
graph.blahblahblah
graph.blahblahblah


FYI,
I wanted to measured the time it takes to run my algorithm, so once I create
the graph, I force it Spark to read the data in by calling
graph.vertices.count and graph.edges.count since it does the lazy
evalutation.

Then I run the actual algorithm with time measure on. But basically it
doesn't even get to the algorithm portion because it breaks at
graph.edges.count when it reads the data...



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4701.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: How do I access the SPARK SQL

2014-04-23 Thread Matei Zaharia
It’s currently in the master branch, on https://github.com/apache/spark. You 
can check that out from git, build it with sbt/sbt assembly, and then try it 
out. We’re also going to post some release candidates soon that will be 
pre-built.

Matei

On Apr 23, 2014, at 1:30 PM, diplomatic Guru  wrote:

> Hello Team,
> 
> I'm new to SPARK and just came across SPARK SQL, which appears to be 
> interesting but not sure how I could get it.
> 
> I know it's an Alpha version but not sure if its available for community yet.
> 
> Many thanks.
> 
> Raj.



Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread jaeholee
After doing that, I ran my code once with a smaller example, and it worked.
But ever since then, I get the "No space left on device" message for the
same sample, even if I re-start the master...

ERROR TaskSetManager: Task 29.0:20 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted: Task 29.0:20 failed 4 times
(most recent failure: Exception failure: java.io.IOException: No space left
on device)  
  
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)


at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)


at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
   
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)  
   
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)

 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 
at scala.Option.foreach(Option.scala:236)   
  
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
   
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)

  
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
  
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
  
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)  
  
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
  
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Pig on Spark

2014-04-23 Thread Mayur Rustagi
UDF
Generate
& many many more are not working :)

Several of them work. Joins, filters, group by etc.
I am translating the ones we need, would be happy to get help on others.
Will host a jira to track them if you are intersted.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Apr 24, 2014 at 2:10 AM, suman bharadwaj wrote:

> Are all the features available in PIG working in SPORK ?? Like for eg:
> UDFs ?
>
> Thanks.
>
>
> On Thu, Apr 24, 2014 at 1:54 AM, Mayur Rustagi wrote:
>
>> Thr are two benefits I get as of now
>> 1. Most of the time a lot of customers dont want the full power but they
>> want something dead simple with which they can do dsl. They end up using
>> Hive for a lot of ETL just cause its SQL & they understand it. Pig is close
>> & wraps up a lot of framework level semantics away from the user & lets him
>> focus on data flow
>> 2. Some have codebases in Pig already & are just looking to do it faster.
>> I am yet to benchmark that on Pig on spark.
>>
>> I agree that pig on spark cannot solve a lot problems but it can solve
>> some without forcing the end customer to do anything even close to coding,
>> I believe thr is quite some value in making Spark accessible to larger
>> group of audience.
>> End of the day to each his own :)
>>
>> Regards
>> Mayur
>>
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Thu, Apr 24, 2014 at 1:24 AM, Bharath Mundlapudi > > wrote:
>>
>>> This seems like an interesting question.
>>>
>>> I love Apache Pig. It is so natural and the language flows with nice
>>> syntax.
>>>
>>> While I was at Yahoo! in core Hadoop Engineering, I have used Pig a lot
>>> for analytics and provided feedback to Pig Team to do much more
>>> functionality when it was at version 0.7. Lots of new functionality got
>>> offered now
>>> .
>>> End of the day, Pig is a DSL for data flows. There will be always gaps
>>> and enhancements. I was often thought is DSL right way to solve data flow
>>> problems? May be not, we need complete language construct. We may have
>>> found the answer - Scala. With Scala's dynamic compilation, we can write
>>> much power constructs than any DSL can provide.
>>>
>>> If I am a new organization and beginning to choose, I would go with
>>> Scala.
>>>
>>> Here is the example:
>>>
>>> #!/bin/sh
>>> exec scala "$0" "$@"
>>> !#
>>> YOUR DSL GOES HERE BUT IN SCALA!
>>>
>>> You have DSL like scripting, functional and complete language power! If
>>> we can improve first 3 lines, here you go, you have most powerful DSL to
>>> solve data problems.
>>>
>>> -Bharath
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng wrote:
>>>
 Hi Sameer,

 Lin (cc'ed) could also give you some updates about Pig on Spark
 development on her side.

 Best,
 Xiangrui

 On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak 
 wrote:
 > Hi Mayur,
 > We are planning to upgrade our distribution MR1> MR2 (YARN) and the
 goal is
 > to get SPROK set up next month. I will keep you posted. Can you
 please keep
 > me informed about your progress as well.
 >
 > 
 > From: mayur.rust...@gmail.com
 > Date: Mon, 10 Mar 2014 11:47:56 -0700
 >
 > Subject: Re: Pig on Spark
 > To: user@spark.apache.org
 >
 >
 > Hi Sameer,
 > Did you make any progress on this. My team is also trying it out
 would love
 > to know some detail so progress.
 >
 > Mayur Rustagi
 > Ph: +1 (760) 203 3257
 > http://www.sigmoidanalytics.com
 > @mayur_rustagi
 >
 >
 >
 > On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak 
 wrote:
 >
 > Hi Aniket,
 > Many thanks! I will check this out.
 >
 > 
 > Date: Thu, 6 Mar 2014 13:46:50 -0800
 > Subject: Re: Pig on Spark
 > From: aniket...@gmail.com
 > To: user@spark.apache.org; tgraves...@yahoo.com
 >
 >
 > There is some work to make this work on yarn at
 > https://github.com/aniket486/pig. (So, compile pig with ant
 > -Dhadoopversion=23)
 >
 > You can look at https://github.com/aniket486/pig/blob/spork/pig-sparkto
 > find out what sort of env variables you need (sorry, I haven't been
 able to
 > clean this up- in-progress). There are few known issues with this, I
 will
 > work on fixing them soon.
 >
 > Known issues-
 > 1. Limit does not work (spork-fix)
 > 2. Foreach requires to turn off schema-tuple-backend (should be a
 pig-jira)
 > 3. Algebraic udfs dont work (spork-fix in-progress)
 > 4. Group by rework (to avoid OOMs)
 > 5. UDF Classloader issue (requires SPARK-1053, then you can put
 > pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf
 jars)
 >
 

Re: Pig on Spark

2014-04-23 Thread suman bharadwaj
Are all the features available in PIG working in SPORK ?? Like for eg: UDFs
?

Thanks.


On Thu, Apr 24, 2014 at 1:54 AM, Mayur Rustagi wrote:

> Thr are two benefits I get as of now
> 1. Most of the time a lot of customers dont want the full power but they
> want something dead simple with which they can do dsl. They end up using
> Hive for a lot of ETL just cause its SQL & they understand it. Pig is close
> & wraps up a lot of framework level semantics away from the user & lets him
> focus on data flow
> 2. Some have codebases in Pig already & are just looking to do it faster.
> I am yet to benchmark that on Pig on spark.
>
> I agree that pig on spark cannot solve a lot problems but it can solve
> some without forcing the end customer to do anything even close to coding,
> I believe thr is quite some value in making Spark accessible to larger
> group of audience.
> End of the day to each his own :)
>
> Regards
> Mayur
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Thu, Apr 24, 2014 at 1:24 AM, Bharath Mundlapudi 
> wrote:
>
>> This seems like an interesting question.
>>
>> I love Apache Pig. It is so natural and the language flows with nice
>> syntax.
>>
>> While I was at Yahoo! in core Hadoop Engineering, I have used Pig a lot
>> for analytics and provided feedback to Pig Team to do much more
>> functionality when it was at version 0.7. Lots of new functionality got
>> offered now
>> .
>> End of the day, Pig is a DSL for data flows. There will be always gaps
>> and enhancements. I was often thought is DSL right way to solve data flow
>> problems? May be not, we need complete language construct. We may have
>> found the answer - Scala. With Scala's dynamic compilation, we can write
>> much power constructs than any DSL can provide.
>>
>> If I am a new organization and beginning to choose, I would go with Scala.
>>
>> Here is the example:
>>
>> #!/bin/sh
>> exec scala "$0" "$@"
>> !#
>> YOUR DSL GOES HERE BUT IN SCALA!
>>
>> You have DSL like scripting, functional and complete language power! If
>> we can improve first 3 lines, here you go, you have most powerful DSL to
>> solve data problems.
>>
>> -Bharath
>>
>>
>>
>>
>>
>> On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng  wrote:
>>
>>> Hi Sameer,
>>>
>>> Lin (cc'ed) could also give you some updates about Pig on Spark
>>> development on her side.
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak  wrote:
>>> > Hi Mayur,
>>> > We are planning to upgrade our distribution MR1> MR2 (YARN) and the
>>> goal is
>>> > to get SPROK set up next month. I will keep you posted. Can you please
>>> keep
>>> > me informed about your progress as well.
>>> >
>>> > 
>>> > From: mayur.rust...@gmail.com
>>> > Date: Mon, 10 Mar 2014 11:47:56 -0700
>>> >
>>> > Subject: Re: Pig on Spark
>>> > To: user@spark.apache.org
>>> >
>>> >
>>> > Hi Sameer,
>>> > Did you make any progress on this. My team is also trying it out would
>>> love
>>> > to know some detail so progress.
>>> >
>>> > Mayur Rustagi
>>> > Ph: +1 (760) 203 3257
>>> > http://www.sigmoidanalytics.com
>>> > @mayur_rustagi
>>> >
>>> >
>>> >
>>> > On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak  wrote:
>>> >
>>> > Hi Aniket,
>>> > Many thanks! I will check this out.
>>> >
>>> > 
>>> > Date: Thu, 6 Mar 2014 13:46:50 -0800
>>> > Subject: Re: Pig on Spark
>>> > From: aniket...@gmail.com
>>> > To: user@spark.apache.org; tgraves...@yahoo.com
>>> >
>>> >
>>> > There is some work to make this work on yarn at
>>> > https://github.com/aniket486/pig. (So, compile pig with ant
>>> > -Dhadoopversion=23)
>>> >
>>> > You can look at https://github.com/aniket486/pig/blob/spork/pig-sparkto
>>> > find out what sort of env variables you need (sorry, I haven't been
>>> able to
>>> > clean this up- in-progress). There are few known issues with this, I
>>> will
>>> > work on fixing them soon.
>>> >
>>> > Known issues-
>>> > 1. Limit does not work (spork-fix)
>>> > 2. Foreach requires to turn off schema-tuple-backend (should be a
>>> pig-jira)
>>> > 3. Algebraic udfs dont work (spork-fix in-progress)
>>> > 4. Group by rework (to avoid OOMs)
>>> > 5. UDF Classloader issue (requires SPARK-1053, then you can put
>>> > pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf
>>> jars)
>>> >
>>> > ~Aniket
>>> >
>>> >
>>> >
>>> >
>>> > On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves 
>>> wrote:
>>> >
>>> > I had asked a similar question on the dev mailing list a while back
>>> (Jan
>>> > 22nd).
>>> >
>>> > See the archives:
>>> > http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser->
>>> > look for spork.
>>> >
>>> > Basically Matei said:
>>> >
>>> > Yup, that was it, though I believe people at Twitter picked it up again
>>> > recently. I'd suggest
>>> > asking Dmitriy if you know him. I've seen interest in this from several
>>> > other groups

RE: default spark partitioner

2014-04-23 Thread Adrian Mocanu
Hi Matei,
I got this behaviour in  a test case. I was testing a moving average operation 
on a DStream.
I create RDDs and add them in a queue from which a DStream is created using a 
time window.

I changed the collection to array now so no conversion is being done on it.

My input collection:
Val rdd1Data=Array("purchaseHonda",   dateToLong(0, 1, 1, 1, 1, 2010), 1)
  , ("purchaseHonda", dateToLong(0, 1, 1, 1, 1, 2010), 1)
  , ("purchaseHonda", dateToLong(0, 1, 1, 1, 1, 2010), 1)
  , ("purchaseFord", dateToLong(0, 1, 1, 1, 1, 2010), 1) )
Val rdd2Data = ...

Then later on I place them into the queue like this:
RddQueue += ssc.sparkContext.makeRDD(rdd1Data)

I traverse the reducedByKey (key is the formed from the first 2 fields and the 
3rd field is a count) dstream and on each RDD I do collect and traverse that 
resulting collection as well and add the numbers to a moving average function. 
(I could just print them instead) The numbers come in the wrong order for all 
RDDs.

-A

From: Matei Zaharia [mailto:matei.zaha...@gmail.com]
Sent: April-23-14 2:58 PM
To: user@spark.apache.org
Subject: Re: default spark partitioner

It should keep them in order, but what kind of collection do you have? Maybe 
toArray changes the order.

Matei

On Apr 23, 2014, at 8:21 AM, Adrian Mocanu 
mailto:amoc...@verticalscope.com>> wrote:


How does the default spark partitioner partition RDD data? Does it keep the 
data in order?

I'm asking because I'm generating an RDD by hand via 
`ssc.sparkContext.makeRDD(collection.toArray)` and I collect and iterate over 
what I collect, but the data is in a different order than in the initial 
collection from which the RDD comes from.

-Adrian



How do I access the SPARK SQL

2014-04-23 Thread diplomatic Guru
Hello Team,

I'm new to SPARK and just came across SPARK SQL, which appears to be
interesting but not sure how I could get it.

I know it's an Alpha version but not sure if its available for community
yet.

Many thanks.

Raj.


Re: Pig on Spark

2014-04-23 Thread Mayur Rustagi
Thr are two benefits I get as of now
1. Most of the time a lot of customers dont want the full power but they
want something dead simple with which they can do dsl. They end up using
Hive for a lot of ETL just cause its SQL & they understand it. Pig is close
& wraps up a lot of framework level semantics away from the user & lets him
focus on data flow
2. Some have codebases in Pig already & are just looking to do it faster. I
am yet to benchmark that on Pig on spark.

I agree that pig on spark cannot solve a lot problems but it can solve some
without forcing the end customer to do anything even close to coding, I
believe thr is quite some value in making Spark accessible to larger group
of audience.
End of the day to each his own :)

Regards
Mayur


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Thu, Apr 24, 2014 at 1:24 AM, Bharath Mundlapudi wrote:

> This seems like an interesting question.
>
> I love Apache Pig. It is so natural and the language flows with nice
> syntax.
>
> While I was at Yahoo! in core Hadoop Engineering, I have used Pig a lot
> for analytics and provided feedback to Pig Team to do much more
> functionality when it was at version 0.7. Lots of new functionality got
> offered now
> .
> End of the day, Pig is a DSL for data flows. There will be always gaps and
> enhancements. I was often thought is DSL right way to solve data flow
> problems? May be not, we need complete language construct. We may have
> found the answer - Scala. With Scala's dynamic compilation, we can write
> much power constructs than any DSL can provide.
>
> If I am a new organization and beginning to choose, I would go with Scala.
>
> Here is the example:
>
> #!/bin/sh
> exec scala "$0" "$@"
> !#
> YOUR DSL GOES HERE BUT IN SCALA!
>
> You have DSL like scripting, functional and complete language power! If we
> can improve first 3 lines, here you go, you have most powerful DSL to solve
> data problems.
>
> -Bharath
>
>
>
>
>
> On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng  wrote:
>
>> Hi Sameer,
>>
>> Lin (cc'ed) could also give you some updates about Pig on Spark
>> development on her side.
>>
>> Best,
>> Xiangrui
>>
>> On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak  wrote:
>> > Hi Mayur,
>> > We are planning to upgrade our distribution MR1> MR2 (YARN) and the
>> goal is
>> > to get SPROK set up next month. I will keep you posted. Can you please
>> keep
>> > me informed about your progress as well.
>> >
>> > 
>> > From: mayur.rust...@gmail.com
>> > Date: Mon, 10 Mar 2014 11:47:56 -0700
>> >
>> > Subject: Re: Pig on Spark
>> > To: user@spark.apache.org
>> >
>> >
>> > Hi Sameer,
>> > Did you make any progress on this. My team is also trying it out would
>> love
>> > to know some detail so progress.
>> >
>> > Mayur Rustagi
>> > Ph: +1 (760) 203 3257
>> > http://www.sigmoidanalytics.com
>> > @mayur_rustagi
>> >
>> >
>> >
>> > On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak  wrote:
>> >
>> > Hi Aniket,
>> > Many thanks! I will check this out.
>> >
>> > 
>> > Date: Thu, 6 Mar 2014 13:46:50 -0800
>> > Subject: Re: Pig on Spark
>> > From: aniket...@gmail.com
>> > To: user@spark.apache.org; tgraves...@yahoo.com
>> >
>> >
>> > There is some work to make this work on yarn at
>> > https://github.com/aniket486/pig. (So, compile pig with ant
>> > -Dhadoopversion=23)
>> >
>> > You can look at https://github.com/aniket486/pig/blob/spork/pig-sparkto
>> > find out what sort of env variables you need (sorry, I haven't been
>> able to
>> > clean this up- in-progress). There are few known issues with this, I
>> will
>> > work on fixing them soon.
>> >
>> > Known issues-
>> > 1. Limit does not work (spork-fix)
>> > 2. Foreach requires to turn off schema-tuple-backend (should be a
>> pig-jira)
>> > 3. Algebraic udfs dont work (spork-fix in-progress)
>> > 4. Group by rework (to avoid OOMs)
>> > 5. UDF Classloader issue (requires SPARK-1053, then you can put
>> > pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf jars)
>> >
>> > ~Aniket
>> >
>> >
>> >
>> >
>> > On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves 
>> wrote:
>> >
>> > I had asked a similar question on the dev mailing list a while back (Jan
>> > 22nd).
>> >
>> > See the archives:
>> > http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser->
>> > look for spork.
>> >
>> > Basically Matei said:
>> >
>> > Yup, that was it, though I believe people at Twitter picked it up again
>> > recently. I'd suggest
>> > asking Dmitriy if you know him. I've seen interest in this from several
>> > other groups, and
>> > if there's enough of it, maybe we can start another open source repo to
>> > track it. The work
>> > in that repo you pointed to was done over one week, and already had
>> most of
>> > Pig's operators
>> > working. (I helped out with this prototype over Twitter's hack week.)
>> That
>> > work also calls
>> > the Scal

Re: Pig on Spark

2014-04-23 Thread Bharath Mundlapudi
This seems like an interesting question.

I love Apache Pig. It is so natural and the language flows with nice syntax.

While I was at Yahoo! in core Hadoop Engineering, I have used Pig a lot for
analytics and provided feedback to Pig Team to do much more functionality
when it was at version 0.7. Lots of new functionality got offered now
.
End of the day, Pig is a DSL for data flows. There will be always gaps and
enhancements. I was often thought is DSL right way to solve data flow
problems? May be not, we need complete language construct. We may have
found the answer - Scala. With Scala's dynamic compilation, we can write
much power constructs than any DSL can provide.

If I am a new organization and beginning to choose, I would go with Scala.

Here is the example:

#!/bin/sh
exec scala "$0" "$@"
!#
YOUR DSL GOES HERE BUT IN SCALA!

You have DSL like scripting, functional and complete language power! If we
can improve first 3 lines, here you go, you have most powerful DSL to solve
data problems.

-Bharath





On Mon, Mar 10, 2014 at 11:00 PM, Xiangrui Meng  wrote:

> Hi Sameer,
>
> Lin (cc'ed) could also give you some updates about Pig on Spark
> development on her side.
>
> Best,
> Xiangrui
>
> On Mon, Mar 10, 2014 at 12:52 PM, Sameer Tilak  wrote:
> > Hi Mayur,
> > We are planning to upgrade our distribution MR1> MR2 (YARN) and the goal
> is
> > to get SPROK set up next month. I will keep you posted. Can you please
> keep
> > me informed about your progress as well.
> >
> > 
> > From: mayur.rust...@gmail.com
> > Date: Mon, 10 Mar 2014 11:47:56 -0700
> >
> > Subject: Re: Pig on Spark
> > To: user@spark.apache.org
> >
> >
> > Hi Sameer,
> > Did you make any progress on this. My team is also trying it out would
> love
> > to know some detail so progress.
> >
> > Mayur Rustagi
> > Ph: +1 (760) 203 3257
> > http://www.sigmoidanalytics.com
> > @mayur_rustagi
> >
> >
> >
> > On Thu, Mar 6, 2014 at 2:20 PM, Sameer Tilak  wrote:
> >
> > Hi Aniket,
> > Many thanks! I will check this out.
> >
> > 
> > Date: Thu, 6 Mar 2014 13:46:50 -0800
> > Subject: Re: Pig on Spark
> > From: aniket...@gmail.com
> > To: user@spark.apache.org; tgraves...@yahoo.com
> >
> >
> > There is some work to make this work on yarn at
> > https://github.com/aniket486/pig. (So, compile pig with ant
> > -Dhadoopversion=23)
> >
> > You can look at https://github.com/aniket486/pig/blob/spork/pig-spark to
> > find out what sort of env variables you need (sorry, I haven't been able
> to
> > clean this up- in-progress). There are few known issues with this, I will
> > work on fixing them soon.
> >
> > Known issues-
> > 1. Limit does not work (spork-fix)
> > 2. Foreach requires to turn off schema-tuple-backend (should be a
> pig-jira)
> > 3. Algebraic udfs dont work (spork-fix in-progress)
> > 4. Group by rework (to avoid OOMs)
> > 5. UDF Classloader issue (requires SPARK-1053, then you can put
> > pig-withouthadoop.jar as SPARK_JARS in SparkContext along with udf jars)
> >
> > ~Aniket
> >
> >
> >
> >
> > On Thu, Mar 6, 2014 at 1:36 PM, Tom Graves  wrote:
> >
> > I had asked a similar question on the dev mailing list a while back (Jan
> > 22nd).
> >
> > See the archives:
> > http://mail-archives.apache.org/mod_mbox/spark-dev/201401.mbox/browser->
> > look for spork.
> >
> > Basically Matei said:
> >
> > Yup, that was it, though I believe people at Twitter picked it up again
> > recently. I'd suggest
> > asking Dmitriy if you know him. I've seen interest in this from several
> > other groups, and
> > if there's enough of it, maybe we can start another open source repo to
> > track it. The work
> > in that repo you pointed to was done over one week, and already had most
> of
> > Pig's operators
> > working. (I helped out with this prototype over Twitter's hack week.)
> That
> > work also calls
> > the Scala API directly, because it was done before we had a Java API; it
> > should be easier
> > with the Java one.
> >
> >
> > Tom
> >
> >
> >
> > On Thursday, March 6, 2014 3:11 PM, Sameer Tilak 
> wrote:
> > Hi everyone,
> >
> > We are using to Pig to build our data pipeline. I came across Spork --
> Pig
> > on Spark at: https://github.com/dvryaboy/pig and not sure if it is still
> > active.
> >
> > Can someone please let me know the status of Spork or any other effort
> that
> > will let us run Pig on Spark? We can significantly benefit by using
> Spark,
> > but we would like to keep using the existing Pig scripts.
> >
> >
> >
> >
> >
> > --
> > "...:::Aniket:::... Quetzalco@tl"
> >
> >
>


Re: ArrayIndexOutOfBoundsException in ALS.implicit

2014-04-23 Thread Xiangrui Meng
Hi bearrito, this issue was fixed by Tor in
https://github.com/apache/spark/pull/407. You can either try the
master branch or wait for the 1.0 release. -Xiangrui

On Fri, Mar 28, 2014 at 12:19 AM, Xiangrui Meng  wrote:
> Hi bearrito,
>
> This is a known issue
> (https://spark-project.atlassian.net/browse/SPARK-1281) and it should
> be easy to fix by switching to a hash partitioner.
>
> CC'ed dev list in case someone volunteers to work on it.
>
> Best,
> Xiangrui
>
> On Thu, Mar 27, 2014 at 8:38 PM, bearrito  
> wrote:
>> Usage of negative product id's causes the above exception.
>>
>> The cause is the use of the product id's as a mechanism to index into the
>> the in and out block structures.
>>
>> Specifically on 9.0 it occurs at
>> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$makeInLinkBlock$2.apply(ALS.scala:262)
>>
>> It seems reasonable to expect that product id's are positive, if a bit
>> opinionated.  I ran across this because the hash function I was using on my
>> product id's includes the negatives in it's range.
>>
>>
>>
>>
>>
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/ArrayIndexOutOfBoundsException-in-ALS-implicit-tp3400.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: default spark partitioner

2014-04-23 Thread Matei Zaharia
It should keep them in order, but what kind of collection do you have? Maybe 
toArray changes the order.

Matei

On Apr 23, 2014, at 8:21 AM, Adrian Mocanu  wrote:

> How does the default spark partitioner partition RDD data? Does it keep the 
> data in order?
>  
> I’m asking because I’m generating an RDD by hand via 
> `ssc.sparkContext.makeRDD(collection.toArray)` and I collect and iterate over 
> what I collect, but the data is in a different order than in the initial 
> collection from which the RDD comes from.
>  
> -Adrian



Re: Is Spark a good choice for geospatial/GIS applications? Is a community volunteer needed in this area?

2014-04-23 Thread Josh Marcus
Hey there,

I'd encourage you to check out the development currently going on with the
GeoTrellis project
(http://github.com/geotrellis/geotrellis) or talk to the developers on irc
(freenode, #geotrellis) as they're
currently developing raster processing capabilities with spark as a
backend, as well as scala wrappers
for JTS (for calculations about geometry features).

--j



On Wed, Apr 23, 2014 at 2:00 PM, neveroutgunned wrote:

> Greetings Spark users/devs! I'm interested in using Spark to process large
> volumes of data with a geospatial component, and I haven't been able to
> find much information on Spark's ability to handle this kind of operation.
> I don't need anything too complex; just distance between two points,
> point-in-polygon and the like.
>
> Does Spark (or possibly Shark) support this kind of query? Has anyone
> written a plugin/extension along these lines?
>
> If there isn't anything like this so far, then it seems like I have two
> options. I can either abandon Spark and fall back on Hadoop and Hive with
> the ESRI Tools extension, or I can stick with Spark and try to write/port a
> GIS toolkit. Which option do you think I should pursue? How hard is it for
> someone that's new to the Spark codebase to write an extension? Is there
> anyone else in the community that would be interested in having geospatial
> capability in Spark?
>
> Thanks for your help!
>
> --
> View this message in context: Is Spark a good choice for geospatial/GIS
> applications? Is a community volunteer needed in this 
> area?
> Sent from the Apache Spark User List mailing list 
> archiveat Nabble.com.
>


Re: Is Spark a good choice for geospatial/GIS applications? Is a community volunteer needed in this area?

2014-04-23 Thread Debasish Das
I am not sure the kind of operations you wantyou can always put a list
of lat-long inside a census-block and add census block as a vertex in
graphx...and add edges within the census block...

Looks like set of code that uses graphx primitives...

Could you take a look at graphx apis and see if they suffice your GIS
usecase ? I have not looked into ESRI extensions...



On Wed, Apr 23, 2014 at 11:00 AM, neveroutgunned wrote:

> Greetings Spark users/devs! I'm interested in using Spark to process large
> volumes of data with a geospatial component, and I haven't been able to
> find much information on Spark's ability to handle this kind of operation.
> I don't need anything too complex; just distance between two points,
> point-in-polygon and the like.
>
> Does Spark (or possibly Shark) support this kind of query? Has anyone
> written a plugin/extension along these lines?
>
> If there isn't anything like this so far, then it seems like I have two
> options. I can either abandon Spark and fall back on Hadoop and Hive with
> the ESRI Tools extension, or I can stick with Spark and try to write/port a
> GIS toolkit. Which option do you think I should pursue? How hard is it for
> someone that's new to the Spark codebase to write an extension? Is there
> anyone else in the community that would be interested in having geospatial
> capability in Spark?
>
> Thanks for your help!
>
> --
> View this message in context: Is Spark a good choice for geospatial/GIS
> applications? Is a community volunteer needed in this 
> area?
> Sent from the Apache Spark User List mailing list 
> archiveat Nabble.com.
>


Is Spark a good choice for geospatial/GIS applications? Is a community volunteer needed in this area?

2014-04-23 Thread neveroutgunned
Greetings Spark users/devs! I'm interested in using Spark to process
large volumes of data with a geospatial component, and I haven't been
able to find much information on Spark's ability to handle this kind
of operation. I don't need anything too complex; just distance between
two points, point-in-polygon and the like.

Does Spark (or possibly Shark) support this kind of query? Has anyone
written a plugin/extension along these lines?

If there isn't anything like this so far, then it seems like I have
two options. I can either abandon Spark and fall back on Hadoop and
Hive with the ESRI Tools extension, or I can stick with Spark and try
to write/port a GIS toolkit. Which option do you think I should
pursue? How hard is it for someone that's new to the Spark codebase to
write an extension? Is there anyone else in the community that would
be interested in having geospatial capability in Spark?

Thanks for your help!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-a-good-choice-for-geospatial-GIS-applications-Is-a-community-volunteer-needed-in-this-area-tp4685.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: SPARK_YARN_APP_JAR, SPARK_CLASSPATH and ADD_JARS in a spark-shell on YARN

2014-04-23 Thread Sandy Ryza
Ah, you're right about SPARK_CLASSPATH and ADD_JARS.  My bad.

SPARK_YARN_APP_JAR is going away entirely -
https://issues.apache.org/jira/browse/SPARK-1053


On Wed, Apr 23, 2014 at 8:07 AM, Christophe Préaud <
christophe.pre...@kelkoo.com> wrote:

>  Hi Sandy,
>
> Thanks for your reply !
>
> I thought adding the jars in both SPARK_CLASSPATH and ADD_JARS was only
> required as a temporary workaround in spark 0.9.0 (see
> https://issues.apache.org/jira/browse/SPARK-1089), and that it was not
> necessary anymore in 0.9.1
>
> As for SPARK_YARN_APP_JAR, is it really useful, or is it planned to be
> removed in future versions of Spark? I personally always set it to
> /dev/null when launching a spark-shell in yarn-client mode.
>
> Thanks again for your time!
> Christophe.
>
>
> On 21/04/2014 19:16, Sandy Ryza wrote:
>
> Hi Christophe,
>
>  Adding the jars to both SPARK_CLASSPATH and ADD_JARS is required.  The
> former makes them available to the spark-shell driver process, and the
> latter tells Spark to make them available to the executor processes running
> on the cluster.
>
>  -Sandy
>
>
> On Wed, Apr 16, 2014 at 9:27 AM, Christophe Préaud <
> christophe.pre...@kelkoo.com> wrote:
>
>> Hi,
>>
>> I am running Spark 0.9.1 on a YARN cluster, and I am wondering which is
>> the
>> correct way to add external jars when running a spark shell on a YARN
>> cluster.
>>
>> Packaging all this dependencies in an assembly which path is then set in
>> SPARK_YARN_APP_JAR (as written in the doc:
>> http://spark.apache.org/docs/latest/running-on-yarn.html) does not work
>> in my
>> case: it pushes the jar on HDFS in .sparkStaging/application_XXX, but the
>> spark-shell is still unable to find it (unless ADD_JARS and/or
>> SPARK_CLASSPATH
>> is defined)
>>
>> Defining all the dependencies (either in an assembly, or separately) in
>> ADD_JARS
>> or SPARK_CLASSPATH works (even if SPARK_YARN_APP_JAR is set to
>> /dev/null), but
>> defining some dependencies in ADD_JARS and the rest in SPARK_CLASSPATH
>> does not!
>>
>> Hence I'm still wondering which are the differences between ADD_JARS and
>> SPARK_CLASSPATH, and the purpose of SPARK_YARN_APP_JAR.
>>
>> Thanks for any insights!
>> Christophe.
>>
>>
>>
>> Kelkoo SAS
>> Société par Actions Simplifiée
>> Au capital de EURO 4.168.964,30
>> Siège social : 8, rue du Sentier 75002 Paris
>> 425 093 069 RCS Paris
>>
>> Ce message et les pièces jointes sont confidentiels et établis à
>> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
>> destinataire de ce message, merci de le détruire et d'en avertir
>> l'expéditeur.
>>
>
>
>
> --
> Kelkoo SAS
> Société par Actions Simplifiée
> Au capital de EURO 4.168.964,30
> Siège social : 8, rue du Sentier 75002 Paris
> 425 093 069 RCS Paris
>
> Ce message et les pièces jointes sont confidentiels et établis à
> l'attention exclusive de leurs destinataires. Si vous n'êtes pas le
> destinataire de ce message, merci de le détruire et d'en avertir
> l'expéditeur.
>


Re: error in mllib lr example code

2014-04-23 Thread Matei Zaharia
See http://people.csail.mit.edu/matei/spark-unified-docs/ for a more recent 
build of the docs; if you spot any problems in those, let us know.

Matei

On Apr 23, 2014, at 9:49 AM, Xiangrui Meng  wrote:

> The doc is for 0.9.1. You are running a later snapshot, which added
> sparse vectors. Try LabeledPoint(parts(0).toDouble,
> Vectors.dense(parts(1).split(' ').map(x => x.toDouble)). The examples
> are updated in the master branch. You can also check the examples
> there. -Xiangrui
> 
> On Wed, Apr 23, 2014 at 9:34 AM, Mohit Jaggi  wrote:
>> 
>> sorry...added a subject now
>> 
>> On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi  wrote:
>>> 
>>> I am trying to run the example linear regression code from
>>> 
>>> http://spark.apache.org/docs/latest/mllib-guide.html
>>> 
>>> But I am getting the following error...am I missing an import?
>>> 
>>> code
>>> 
>>> import org.apache.spark._
>>> 
>>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD
>>> 
>>> import org.apache.spark.mllib.regression.LabeledPoint
>>> 
>>> 
>>> object ModelLR {
>>> 
>>>  def main(args: Array[String]) {
>>> 
>>>val sc = new SparkContext(args(0), "SparkLR",
>>> 
>>>  System.getenv("SPARK_HOME"),
>>> SparkContext.jarOfClass(this.getClass).toSeq)
>>> 
>>> // Load and parse the data
>>> 
>>> val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
>>> 
>>> val parsedData = data.map { line =>
>>> 
>>>  val parts = line.split(',')
>>> 
>>>  LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =>
>>> x.toDouble).toArray)
>>> 
>>> }
>>> 
>>> ..
>>> 
>>> }
>>> 
>>> error
>>> 
>>> - polymorphic expression cannot be instantiated to expected type; found :
>>> [U >: Double]Array[U] required:
>>> 
>>> org.apache.spark.mllib.linalg.Vector
>>> 
>>> - polymorphic expression cannot be instantiated to expected type; found :
>>> [U >: Double]Array[U] required:
>>> 
>>> org.apache.spark.mllib.linalg.Vector
>> 
>> 



Re: Hadoop—streaming

2014-04-23 Thread Xiangrui Meng
PipedRDD is an RDD[String]. If you know how to parse each result line
into (key, value) pairs, then you can call reduce after.

piped.map(x => (key, value)).reduceByKey((v1, v2) => v)

-Xiangrui

On Wed, Apr 23, 2014 at 2:09 AM, zhxfl <291221...@qq.com> wrote:
> Hello,we know Hadoop-streaming is use for Hadoop to run native program.
> Hadoop-streaming supports  Map and Reduce logic. Reduce logic means Hadoop
> collect all values with same key and give the stream for the native
> application.
> Spark has PipeRDD too, but PipeRDD doesn't support Reduce logic. So it's
> difficulty for us to transplant our application from Hadoop to Spark.
> Anyone can give me advise, thanks!


Re: skip lines in spark

2014-04-23 Thread DB Tsai
What I suggested will not work if # of records you want to drop is more
than the data in first partition. In my use-case, I only drop the first
couple lines, so I don't have this issue.


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu  wrote:

> Xiangrui,
>   So, is it that full code suggestion is :
> val trigger = rddData.zipWithIndex().filter(
> _._2 >= 10L).map(_._1)
>
> and then what DB Tsai recommended
> trigger.mapPartitionsWithIndex((partitionIdx: Int, lines:
> Iterator[String]) => {
>   if (partitionIdx == 0) {
> lines.drop(n)
>   }
>   lines
> })
>
> Is that the full operation..
>
> What happens, if I have to drop so many records that the number exceeds
> partition 0.. ??
> How do i handle that case?
>
>
>
>
> On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng  wrote:
>
>> If the first partition doesn't have enough records, then it may not
>> drop enough lines. Try
>>
>> rddData.zipWithIndex().filter(_._2 >= 10L).map(_._1)
>>
>> It might trigger a job.
>>
>> Best,
>> Xiangrui
>>
>> On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai  wrote:
>> > Hi Chengi,
>> >
>> > If you just want to skip first n lines in RDD, you can do
>> >
>> > rddData.mapPartitionsWithIndex((partitionIdx: Int, lines:
>> Iterator[String])
>> > => {
>> >   if (partitionIdx == 0) {
>> > lines.drop(n)
>> >   }
>> >   lines
>> > }
>> >
>> >
>> > Sincerely,
>> >
>> > DB Tsai
>> > ---
>> > My Blog: https://www.dbtsai.com
>> > LinkedIn: https://www.linkedin.com/in/dbtsai
>> >
>> >
>> > On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu 
>> wrote:
>> >>
>> >> Hi,
>> >>   What is the easiest way to skip first n lines in rdd??
>> >> I am not able to figure this one out?
>> >> Thanks
>> >
>> >
>>
>
>


Re: skip lines in spark

2014-04-23 Thread Xiangrui Meng
Sorry, I didn't realize that zipWithIndex() is not in v0.9.1. It is in
the master branch and will be included in v1.0. It first counts number
of records per partition and then assigns indices starting from 0.
-Xiangrui

On Wed, Apr 23, 2014 at 9:56 AM, Chengi Liu  wrote:
> Also, zipWithIndex() is not valid.. Did you meant zipParititions?
>
>
> On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu  wrote:
>>
>> Xiangrui,
>>   So, is it that full code suggestion is :
>> val trigger = rddData.zipWithIndex().filter(
>> _._2 >= 10L).map(_._1)
>>
>> and then what DB Tsai recommended
>> trigger.mapPartitionsWithIndex((partitionIdx: Int, lines:
>> Iterator[String]) => {
>>   if (partitionIdx == 0) {
>> lines.drop(n)
>>   }
>>   lines
>> })
>>
>> Is that the full operation..
>>
>> What happens, if I have to drop so many records that the number exceeds
>> partition 0.. ??
>> How do i handle that case?
>>
>>
>>
>>
>> On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng  wrote:
>>>
>>> If the first partition doesn't have enough records, then it may not
>>> drop enough lines. Try
>>>
>>> rddData.zipWithIndex().filter(_._2 >= 10L).map(_._1)
>>>
>>> It might trigger a job.
>>>
>>> Best,
>>> Xiangrui
>>>
>>> On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai  wrote:
>>> > Hi Chengi,
>>> >
>>> > If you just want to skip first n lines in RDD, you can do
>>> >
>>> > rddData.mapPartitionsWithIndex((partitionIdx: Int, lines:
>>> > Iterator[String])
>>> > => {
>>> >   if (partitionIdx == 0) {
>>> > lines.drop(n)
>>> >   }
>>> >   lines
>>> > }
>>> >
>>> >
>>> > Sincerely,
>>> >
>>> > DB Tsai
>>> > ---
>>> > My Blog: https://www.dbtsai.com
>>> > LinkedIn: https://www.linkedin.com/in/dbtsai
>>> >
>>> >
>>> > On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu 
>>> > wrote:
>>> >>
>>> >> Hi,
>>> >>   What is the easiest way to skip first n lines in rdd??
>>> >> I am not able to figure this one out?
>>> >> Thanks
>>> >
>>> >
>>
>>
>


Re: Spark hangs when i call parallelize + count on a ArrayList having 40k elements

2014-04-23 Thread Xiangrui Meng
How big is each entry, and how much memory do you have on each
executor? You generated all data on driver and
sc.parallelize(bytesList) will send the entire dataset to a single
executor. You may run into I/O or memory issues. If the entries are
generated, you should create a simple RDD sc.parallelize(0 until 20,
20) and call mapPartitions to generate them in parallel. -Xiangrui

On Wed, Apr 23, 2014 at 9:23 AM, amit karmakar
 wrote:
> Spark hangs after i perform the following operations
>
>
> ArrayList bytesList = new ArrayList();
> /*
>add 40k entries to bytesList
> */
>
> JavaRDD rdd = sparkContext.parallelize(bytesList);
>  System.out.println("Count=" + rdd.count());
>
>
> If i add just one entry it works.
>
> It works if i modify,
> JavaRDD rdd = sparkContext.parallelize(bytesList)
> to
> JavaRDD rdd = sparkContext.parallelize(bytesList, 20);
>
> There is nothing in the logs that can help understand the reason.
>
> What could be reason for this ?
>
>
> Regards,
> Amit Kumar Karmakar


Re: skip lines in spark

2014-04-23 Thread Chengi Liu
Also, zipWithIndex() is not valid.. Did you meant zipParititions?


On Wed, Apr 23, 2014 at 9:55 AM, Chengi Liu  wrote:

> Xiangrui,
>   So, is it that full code suggestion is :
> val trigger = rddData.zipWithIndex().filter(
> _._2 >= 10L).map(_._1)
>
> and then what DB Tsai recommended
> trigger.mapPartitionsWithIndex((partitionIdx: Int, lines:
> Iterator[String]) => {
>   if (partitionIdx == 0) {
> lines.drop(n)
>   }
>   lines
> })
>
> Is that the full operation..
>
> What happens, if I have to drop so many records that the number exceeds
> partition 0.. ??
> How do i handle that case?
>
>
>
>
> On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng  wrote:
>
>> If the first partition doesn't have enough records, then it may not
>> drop enough lines. Try
>>
>> rddData.zipWithIndex().filter(_._2 >= 10L).map(_._1)
>>
>> It might trigger a job.
>>
>> Best,
>> Xiangrui
>>
>> On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai  wrote:
>> > Hi Chengi,
>> >
>> > If you just want to skip first n lines in RDD, you can do
>> >
>> > rddData.mapPartitionsWithIndex((partitionIdx: Int, lines:
>> Iterator[String])
>> > => {
>> >   if (partitionIdx == 0) {
>> > lines.drop(n)
>> >   }
>> >   lines
>> > }
>> >
>> >
>> > Sincerely,
>> >
>> > DB Tsai
>> > ---
>> > My Blog: https://www.dbtsai.com
>> > LinkedIn: https://www.linkedin.com/in/dbtsai
>> >
>> >
>> > On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu 
>> wrote:
>> >>
>> >> Hi,
>> >>   What is the easiest way to skip first n lines in rdd??
>> >> I am not able to figure this one out?
>> >> Thanks
>> >
>> >
>>
>
>


Re: skip lines in spark

2014-04-23 Thread Chengi Liu
Xiangrui,
  So, is it that full code suggestion is :
val trigger = rddData.zipWithIndex().filter(
_._2 >= 10L).map(_._1)

and then what DB Tsai recommended
trigger.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
=> {
  if (partitionIdx == 0) {
lines.drop(n)
  }
  lines
})

Is that the full operation..

What happens, if I have to drop so many records that the number exceeds
partition 0.. ??
How do i handle that case?




On Wed, Apr 23, 2014 at 9:51 AM, Xiangrui Meng  wrote:

> If the first partition doesn't have enough records, then it may not
> drop enough lines. Try
>
> rddData.zipWithIndex().filter(_._2 >= 10L).map(_._1)
>
> It might trigger a job.
>
> Best,
> Xiangrui
>
> On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai  wrote:
> > Hi Chengi,
> >
> > If you just want to skip first n lines in RDD, you can do
> >
> > rddData.mapPartitionsWithIndex((partitionIdx: Int, lines:
> Iterator[String])
> > => {
> >   if (partitionIdx == 0) {
> > lines.drop(n)
> >   }
> >   lines
> > }
> >
> >
> > Sincerely,
> >
> > DB Tsai
> > ---
> > My Blog: https://www.dbtsai.com
> > LinkedIn: https://www.linkedin.com/in/dbtsai
> >
> >
> > On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu 
> wrote:
> >>
> >> Hi,
> >>   What is the easiest way to skip first n lines in rdd??
> >> I am not able to figure this one out?
> >> Thanks
> >
> >
>


Re: skip lines in spark

2014-04-23 Thread Xiangrui Meng
If the first partition doesn't have enough records, then it may not
drop enough lines. Try

rddData.zipWithIndex().filter(_._2 >= 10L).map(_._1)

It might trigger a job.

Best,
Xiangrui

On Wed, Apr 23, 2014 at 9:46 AM, DB Tsai  wrote:
> Hi Chengi,
>
> If you just want to skip first n lines in RDD, you can do
>
> rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
> => {
>   if (partitionIdx == 0) {
> lines.drop(n)
>   }
>   lines
> }
>
>
> Sincerely,
>
> DB Tsai
> ---
> My Blog: https://www.dbtsai.com
> LinkedIn: https://www.linkedin.com/in/dbtsai
>
>
> On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu  wrote:
>>
>> Hi,
>>   What is the easiest way to skip first n lines in rdd??
>> I am not able to figure this one out?
>> Thanks
>
>


Re: error in mllib lr example code

2014-04-23 Thread Xiangrui Meng
The doc is for 0.9.1. You are running a later snapshot, which added
sparse vectors. Try LabeledPoint(parts(0).toDouble,
Vectors.dense(parts(1).split(' ').map(x => x.toDouble)). The examples
are updated in the master branch. You can also check the examples
there. -Xiangrui

On Wed, Apr 23, 2014 at 9:34 AM, Mohit Jaggi  wrote:
>
> sorry...added a subject now
>
> On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi  wrote:
>>
>> I am trying to run the example linear regression code from
>>
>>  http://spark.apache.org/docs/latest/mllib-guide.html
>>
>> But I am getting the following error...am I missing an import?
>>
>> code
>>
>> import org.apache.spark._
>>
>> import org.apache.spark.mllib.regression.LinearRegressionWithSGD
>>
>> import org.apache.spark.mllib.regression.LabeledPoint
>>
>>
>> object ModelLR {
>>
>>   def main(args: Array[String]) {
>>
>> val sc = new SparkContext(args(0), "SparkLR",
>>
>>   System.getenv("SPARK_HOME"),
>> SparkContext.jarOfClass(this.getClass).toSeq)
>>
>> // Load and parse the data
>>
>> val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
>>
>> val parsedData = data.map { line =>
>>
>>   val parts = line.split(',')
>>
>>   LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =>
>> x.toDouble).toArray)
>>
>> }
>>
>> ..
>>
>> }
>>
>> error
>>
>> - polymorphic expression cannot be instantiated to expected type; found :
>> [U >: Double]Array[U] required:
>>
>> org.apache.spark.mllib.linalg.Vector
>>
>> - polymorphic expression cannot be instantiated to expected type; found :
>> [U >: Double]Array[U] required:
>>
>> org.apache.spark.mllib.linalg.Vector
>
>


Re: skip lines in spark

2014-04-23 Thread DB Tsai
Hi Chengi,

If you just want to skip first n lines in RDD, you can do

rddData.mapPartitionsWithIndex((partitionIdx: Int, lines: Iterator[String])
=> {
  if (partitionIdx == 0) {
lines.drop(n)
  }
  lines
}


Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Wed, Apr 23, 2014 at 9:18 AM, Chengi Liu  wrote:

> Hi,
>   What is the easiest way to skip first n lines in rdd??
> I am not able to figure this one out?
> Thanks
>


error in mllib lr example code

2014-04-23 Thread Mohit Jaggi
sorry...added a subject now

On Wed, Apr 23, 2014 at 9:32 AM, Mohit Jaggi  wrote:

> I am trying to run the example linear regression code from
>
>  http://spark.apache.org/docs/latest/mllib-guide.html
>
> But I am getting the following error...am I missing an import?
>
> code
>
> import org.apache.spark._
>
> import org.apache.spark.mllib.regression.LinearRegressionWithSGD
>
> import org.apache.spark.mllib.regression.LabeledPoint
>
>
> object ModelLR {
>
>   def main(args: Array[String]) {
>
> val sc = new SparkContext(args(0), "SparkLR",
>
>   System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)
> .toSeq)
>
> // Load and parse the data
>
> val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
>
> val parsedData = data.map { line =>
>
>   val parts = line.split(',')
>
>   LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =>
> x.toDouble).toArray)
>
>  }
>
> ..
>
> }
>
> error
>
> - polymorphic expression cannot be instantiated to expected type; found :
> [U >: Double]Array[U] required:
>
>  org.apache.spark.mllib.linalg.Vector
>
> - polymorphic expression cannot be instantiated to expected type; found :
> [U >: Double]Array[U] required:
>
>  org.apache.spark.mllib.linalg.Vector
>


Re: skip lines in spark

2014-04-23 Thread Andre Bois-Crettez

Good question, I am wondering too how it is possible to add a line
number to distributed data.

I thought it was a job for maptPartionsWithIndex, but it seems difficult.
Something similar here :
http://apache-spark-user-list.1001560.n3.nabble.com/RDD-and-Partition-td991.html#a995

Maybe at the file reader knowing it works on the first HDFS block, to
count line numbers or something ?

André

On 2014-04-23 18:18, Chengi Liu wrote:

Hi,
  What is the easiest way to skip first n lines in rdd??
I am not able to figure this one out?
Thanks



--
André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


[no subject]

2014-04-23 Thread Mohit Jaggi
I am trying to run the example linear regression code from

 http://spark.apache.org/docs/latest/mllib-guide.html

But I am getting the following error...am I missing an import?

code

import org.apache.spark._

import org.apache.spark.mllib.regression.LinearRegressionWithSGD

import org.apache.spark.mllib.regression.LabeledPoint


object ModelLR {

  def main(args: Array[String]) {

val sc = new SparkContext(args(0), "SparkLR",

  System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)
.toSeq)

// Load and parse the data

val data = sc.textFile("mllib/data/ridge-data/lpsa.data")

val parsedData = data.map { line =>

  val parts = line.split(',')

  LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x =>
x.toDouble).toArray)

 }

..

}

error

- polymorphic expression cannot be instantiated to expected type; found :
[U >: Double]Array[U] required:

 org.apache.spark.mllib.linalg.Vector

- polymorphic expression cannot be instantiated to expected type; found :
[U >: Double]Array[U] required:

 org.apache.spark.mllib.linalg.Vector


Spark hangs when i call parallelize + count on a ArrayList having 40k elements

2014-04-23 Thread amit karmakar
Spark hangs after i perform the following operations


ArrayList bytesList = new ArrayList();
/*
   add 40k entries to bytesList
*/

JavaRDD rdd = sparkContext.parallelize(bytesList);
 System.out.println("Count=" + rdd.count());


If i add just one entry it works.

It works if i modify,
JavaRDD rdd = sparkContext.parallelize(bytesList)
to
JavaRDD rdd = sparkContext.parallelize(bytesList, 20);

There is nothing in the logs that can help understand the reason.

What could be reason for this ?


Regards,
Amit Kumar Karmakar


skip lines in spark

2014-04-23 Thread Chengi Liu
Hi,
  What is the easiest way to skip first n lines in rdd??
I am not able to figure this one out?
Thanks


Re: Pig on Spark

2014-04-23 Thread lalit1303
Hi,

We got spork working on spark 0.9.0
Repository available at:
https://github.com/sigmoidanalytics/pig/tree/spork-hadoopasm-fix

Please suggest your feedback.



-
Lalit Yadav
la...@sigmoidanalytics.com
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Pig-on-Spark-tp2367p4668.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: about rdd.filter()

2014-04-23 Thread Lukas Nalezenec

Hi,
can you please add stacktrace ?
Lukas

On 23.4.2014 11:45, randylu wrote:

   my code is like:
 rdd2 = rdd1.filter(_._2.length > 1)
 rdd2.collect()
   it works well, but if i use a variable /num/ instead of 1:
 var num = 1
 rdd2 = rdd1.filter(_._2.length > num)
 rdd2.collect()
   it fails at rdd2.collect()
   so strange?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




Re: about rdd.filter()

2014-04-23 Thread Sourav Chandra
This could happen if variable is defined in such a way that it pulls its
own class reference into the closure. Hence serilization tries to
 serialize the whole outer class reference which is not serializable and
whole thing failed.



On Wed, Apr 23, 2014 at 3:15 PM, randylu  wrote:

>   my code is like:
> rdd2 = rdd1.filter(_._2.length > 1)
> rdd2.collect()
>   it works well, but if i use a variable /num/ instead of 1:
> var num = 1
> rdd2 = rdd1.filter(_._2.length > num)
> rdd2.collect()
>   it fails at rdd2.collect()
>   so strange?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>



-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: Comparing RDD Items

2014-04-23 Thread Daniel Darabos
Hi! There is RDD.cartesian(), which creates the Cartiesian product of two
RDDs. You could do data.cartesian(data) to get an RDD of all pairs of
lines. It will be of length data.count * data.count of course.



On Wed, Apr 23, 2014 at 4:48 PM, Jared Rodriguez wrote:

> Hi there,
>
> I am new to Spark and new to scala, although have lots of experience on
> the Java side.  I am experimenting with Spark for a new project where it
> seems like it could be a good fit.  As I go through the examples, there is
> one case scenario that I am trying to figure out, comparing the contents of
> an RDD to itself to result in a new RDD.
>
> In an overly simply example, I have:
>
> JavaSparkContext sc = new JavaSparkContext ...
> JavaRDD data = sc.parallelize(buildData());
>
> I then want to compare each entry in data to other entries and end up with:
>
> JavaPairRDD> mapped = data.???
>
> Is this something easily handled by Spark?  My apologies if this is a
> stupid question, I have spent less than 10 hours tinkering with Spark and
> am trying to come up to speed.
>
>
> --
> Jared Rodriguez
>
>


Re: about rdd.filter()

2014-04-23 Thread Cheng Lian
Does your job fail because of serialization error? One situation I can
think of is something like this:

class NotSerializable(val n: Int)val obj = new NotSerializable(1)
sc.makeRDD(1 to 3).filter(_ > obj.n)

If you enclose a field member of an object into a closure, not only this
field but also the whole outer object is enclosed into the closure. If the
outer object is not serializable, then RDD DAG serialization would fail.
You can simply reference the field member with a separate variable to
workaround this:

class NotSerializable(val n: Int)val obj = new NotSerializable(1)val x = obj.n
sc.makeRDD(1 to 3).filter(_ > x)



On Wed, Apr 23, 2014 at 5:45 PM, randylu  wrote:

>   my code is like:
> rdd2 = rdd1.filter(_._2.length > 1)
> rdd2.collect()
>   it works well, but if i use a variable /num/ instead of 1:
> var num = 1
> rdd2 = rdd1.filter(_._2.length > num)
> rdd2.collect()
>   it fails at rdd2.collect()
>   so strange?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


default spark partitioner

2014-04-23 Thread Adrian Mocanu
How does the default spark partitioner partition RDD data? Does it keep the 
data in order?

I'm asking because I'm generating an RDD by hand via 
`ssc.sparkContext.makeRDD(collection.toArray)` and I collect and iterate over 
what I collect, but the data is in a different order than in the initial 
collection from which the RDD comes from.

-Adrian



Re: SPARK_YARN_APP_JAR, SPARK_CLASSPATH and ADD_JARS in a spark-shell on YARN

2014-04-23 Thread Christophe Préaud

Hi Sandy,

Thanks for your reply !

I thought adding the jars in both SPARK_CLASSPATH and ADD_JARS was only 
required as a temporary workaround in spark 0.9.0 (see 
https://issues.apache.org/jira/browse/SPARK-1089), and that it was not 
necessary anymore in 0.9.1

As for SPARK_YARN_APP_JAR, is it really useful, or is it planned to be removed 
in future versions of Spark? I personally always set it to /dev/null when 
launching a spark-shell in yarn-client mode.

Thanks again for your time!
Christophe.

On 21/04/2014 19:16, Sandy Ryza wrote:
Hi Christophe,

Adding the jars to both SPARK_CLASSPATH and ADD_JARS is required.  The former 
makes them available to the spark-shell driver process, and the latter tells 
Spark to make them available to the executor processes running on the cluster.

-Sandy


On Wed, Apr 16, 2014 at 9:27 AM, Christophe Préaud 
mailto:christophe.pre...@kelkoo.com>> wrote:
Hi,

I am running Spark 0.9.1 on a YARN cluster, and I am wondering which is the
correct way to add external jars when running a spark shell on a YARN cluster.

Packaging all this dependencies in an assembly which path is then set in
SPARK_YARN_APP_JAR (as written in the doc:
http://spark.apache.org/docs/latest/running-on-yarn.html) does not work in my
case: it pushes the jar on HDFS in .sparkStaging/application_XXX, but the
spark-shell is still unable to find it (unless ADD_JARS and/or SPARK_CLASSPATH
is defined)

Defining all the dependencies (either in an assembly, or separately) in ADD_JARS
or SPARK_CLASSPATH works (even if SPARK_YARN_APP_JAR is set to /dev/null), but
defining some dependencies in ADD_JARS and the rest in SPARK_CLASSPATH does not!

Hence I'm still wondering which are the differences between ADD_JARS and
SPARK_CLASSPATH, and the purpose of SPARK_YARN_APP_JAR.

Thanks for any insights!
Christophe.



Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.




Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention 
exclusive de leurs destinataires. Si vous n'êtes pas le destinataire de ce 
message, merci de le détruire et d'en avertir l'expéditeur.


Re: Spark runs applications in an inconsistent way

2014-04-23 Thread Aureliano Buendia
Yes, things get more unstable with larger data. But, that's the whole point
of my question:

Why should spark get unstable when data gets larger?

When data gets larger, spark should get *slower*, not more unstable. lack
of stability makes parameter tuning very difficult, time consuming and a
painful experience.

Also, it is a mystery to me why spark gets unstable in a non-deterministic
fashion. Why should it use twice, or half, the memory it used in the
previous run of exactly the same code?



On Wed, Apr 23, 2014 at 10:43 AM, Andras Barjak <
andras.bar...@lynxanalytics.com> wrote:

>
>
>>- Spark UI shows number of succeeded tasks is more than total number
>>of tasks, eg: 3500/3000. There are no failed tasks. At this stage the
>>computation keeps carrying on for a long time without returning an answer.
>>
>> No sign of resubmitted tasks in the command line logs either?
> You might want to get more information on what is going on in the JVM?
> I don't know what others use but jvmtop is easy to install on ec2 and you
> can monitor some processes.
>
>>
>>- The only way to get an answer from an application is to hopelessly
>>keep running that application multiple times, until by some luck it gets
>>converged.
>>
>> I was not able to regenerate this by a minimal code, as it seems some
>> random factors affect this behavior. I have a suspicion, but I'm not sure,
>> that use of one or more groupByKey() calls intensifies this problem.
>>
> Is this related to the amount of data you are processing? Is it more
> likely to happen on large data?
> My experience on ec2 is whenever the the memory/partitioning/timout
> settings are reasonable
> the output is quite consistent. Even if I stop and restart the cluster the
> other day.
>


Comparing RDD Items

2014-04-23 Thread Jared Rodriguez
Hi there,

I am new to Spark and new to scala, although have lots of experience on the
Java side.  I am experimenting with Spark for a new project where it seems
like it could be a good fit.  As I go through the examples, there is one
case scenario that I am trying to figure out, comparing the contents of an
RDD to itself to result in a new RDD.

In an overly simply example, I have:

JavaSparkContext sc = new JavaSparkContext ...
JavaRDD data = sc.parallelize(buildData());

I then want to compare each entry in data to other entries and end up with:

JavaPairRDD> mapped = data.???

Is this something easily handled by Spark?  My apologies if this is a
stupid question, I have spent less than 10 hours tinkering with Spark and
am trying to come up to speed.


-- 
Jared Rodriguez


Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread Parviz Deyhim
You need to set SPARK_MEM or SPARK_EXECUTOR_MEMORY (for Spark 1.0) to
amount of memory your application needs to consume at each node. Try
setting those variables (example: export SPARK_MEM=10g) or set it via
SparkConf.set as suggested by jholee.


On Tue, Apr 22, 2014 at 4:25 PM, jaeholee  wrote:

> Ok. I tried setting the partition number to 128 and numbers greater than
> 128,
> and now I get another error message about "Java heap space". Is it possible
> that there is something wrong with the setup of my Spark cluster to begin
> with? Or is it still an issue with partitioning my data? Or do I just need
> more worker nodes?
>
>
> ERROR TaskSetManager: Task 194.0:14 failed 4 times; aborting job
> org.apache.spark.SparkException: Job aborted: Task 194.0:14 failed 4 times
> (most recent failure: Exception failure: java.lang.OutOfMemoryError: Java
> heap space)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4623.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


SparkException: env SPARK_YARN_APP_JAR is not set

2014-04-23 Thread ????
I have a small program, which I can launch successfully by yarn client with 
yarn-standalon mode. 

the command look like this: 
(javac javac -classpath .:jars/spark-assembly-0.9.1-hadoop2.2.0.jar 
LoadTest.java) 
(jar cvf loadtest.jar LoadTest.class) 
SPARK_JAR=assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar 
./bin/spark-class org.apache.spark.deploy.yarn.Client --jar 
/opt/mytest/loadtest.jar --class LoadTest --args yarn-standalone --num-workers 
2 --master-memory 2g --worker-memory 2g --worker-cores 1 

the program LoadTest.java: 
public class LoadTest { 
static final String USER = "root"; 
public static void main(String[] args) { 
System.setProperty("user.name", USER); 
System.setProperty("HADOOP_USER_NAME", USER); 
System.setProperty("spark.executor.memory", "7g"); 
JavaSparkContext sc = new JavaSparkContext(args[0], "LoadTest", 
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(LoadTest.class)); 
String file = "file:/opt/mytest/123.data"; 
JavaRDD data1 = sc.textFile(file, 2); 
long c1=data1.count(); 
System.out.println("1"+c1); 
} 
} 

BUT due to my other pragram's need, I must have it run with command of "java". 
So I add ??environment?? parameter to JavaSparkContext(). Followed is The ERROR 
I get: 
Exception in thread "main" org.apache.spark.SparkException: env 
SPARK_YARN_APP_JAR is not set 
at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.start(YarnClientSchedulerBackend.scala:49)
 
at 
org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:125) 
at org.apache.spark.SparkContext.(SparkContext.scala:200) 
at org.apache.spark.SparkContext.(SparkContext.scala:100) 
at 
org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:93) 
at LoadTest.main(LoadTest.java:37) 

the program LoadTest.java: 
public class LoadTest { 

static final String USER = "root"; 
public static void main(String[] args) { 
System.setProperty("user.name", USER); 
System.setProperty("HADOOP_USER_NAME", USER); 
System.setProperty("spark.executor.memory", "7g"); 

Map env = new HashMap(); 
env.put("SPARK_YARN_APP_JAR", "file:/opt/mytest/loadtest.jar"); 
env.put("SPARK_WORKER_INSTANCES", "2" ); 
env.put("SPARK_WORKER_CORES", "1"); 
env.put("SPARK_WORKER_MEMORY", "2G"); 
env.put("SPARK_MASTER_MEMORY", "2G"); 
env.put("SPARK_YARN_APP_NAME", "LoadTest"); 
env.put("SPARK_YARN_DIST_ARCHIVES", 
"file:/opt/test/spark-0.9.1-bin-hadoop1/assembly/target/scala-2.10/spark-assembly-0.9.1-hadoop2.2.0.jar");
 
JavaSparkContext sc = new JavaSparkContext("yarn-client", "LoadTest", 
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(LoadTest.class), env); 
String file = "file:/opt/mytest/123.dna"; 
JavaRDD data1 = sc.textFile(file, 2);//.cache(); 

long c1=data1.count(); 
System.out.println("1"+c1); 
} 
} 

the command: 
javac -classpath .:jars/spark-assembly-0.9.1-hadoop2.2.0.jar LoadTest.java 
jar cvf loadtest.jar LoadTest.class 
nohup java -classpath .:jars/spark-assembly-0.9.1-hadoop2.2.0.jar LoadTest >> 
loadTest.log 2>&1 & 

What did I miss?? Or I did it in wrong way??

about rdd.filter()

2014-04-23 Thread randylu
  my code is like:
rdd2 = rdd1.filter(_._2.length > 1)
rdd2.collect()
  it works well, but if i use a variable /num/ instead of 1:
var num = 1
rdd2 = rdd1.filter(_._2.length > num)
rdd2.collect()
  it fails at rdd2.collect()
  so strange?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/about-rdd-filter-tp4657.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark runs applications in an inconsistent way

2014-04-23 Thread Andras Barjak
>
>- Spark UI shows number of succeeded tasks is more than total number
>of tasks, eg: 3500/3000. There are no failed tasks. At this stage the
>computation keeps carrying on for a long time without returning an answer.
>
> No sign of resubmitted tasks in the command line logs either?
You might want to get more information on what is going on in the JVM?
I don't know what others use but jvmtop is easy to install on ec2 and you
can monitor some processes.

>
>- The only way to get an answer from an application is to hopelessly
>keep running that application multiple times, until by some luck it gets
>converged.
>
> I was not able to regenerate this by a minimal code, as it seems some
> random factors affect this behavior. I have a suspicion, but I'm not sure,
> that use of one or more groupByKey() calls intensifies this problem.
>
Is this related to the amount of data you are processing? Is it more likely
to happen on large data?
My experience on ec2 is whenever the the memory/partitioning/timout
settings are reasonable
the output is quite consistent. Even if I stop and restart the cluster the
other day.


Re: two calls of saveAsTextFile() have different results on the same RDD

2014-04-23 Thread randylu
i got it, thanks very much :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/two-calls-of-saveAsTextFile-have-different-results-on-the-same-RDD-tp4578p4655.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: GraphX: .edges.distinct().count() is 10?

2014-04-23 Thread Daniel Darabos
This is caused by https://issues.apache.org/jira/browse/SPARK-1188. I think
the fix will be in the next release. But until then, do:

g.edges.map(_.copy()).distinct.count



On Wed, Apr 23, 2014 at 2:26 AM, Ryan Compton wrote:

> Try this: https://www.dropbox.com/s/xf34l0ta496bdsn/.txt
>
> This code:
>
> println(g.numEdges)
> println(g.numVertices)
> println(g.edges.distinct().count())
>
> gave me
>
> 1
> 9294
> 2
>
>
>
> On Tue, Apr 22, 2014 at 5:14 PM, Ankur Dave  wrote:
> > I wasn't able to reproduce this with a small test file, but I did change
> the
> > file parsing to use x(1).toLong instead of x(2).toLong. Did you mean to
> take
> > the third column rather than the second?
> >
> > If so, would you mind posting a larger sample of the file, or even the
> whole
> > file if possible?
> >
> > Here's the test that succeeded:
> >
> >   test("graph.edges.distinct.count") {
> > withSpark { sc =>
> >   val edgeFullStrRDD: RDD[String] = sc.parallelize(List(
> > "394365859\t136153151", "589404147\t1361045425"))
> >   val edgeTupRDD = edgeFullStrRDD.map(x => x.split("\t"))
> > .map(x => (x(0).toLong, x(1).toLong))
> >   val g = Graph.fromEdgeTuples(edgeTupRDD, defaultValue = 123,
> > uniqueEdges = Option(CanonicalRandomVertexCut))
> >   assert(edgeTupRDD.distinct.count() === 2)
> >   assert(g.numEdges === 2)
> >   assert(g.edges.distinct.count() === 2)
> > }
> >   }
> >
> > Ankur
>


Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread Daniel Darabos
With the right program you can always exhaust any amount of memory :).
There is no silver bullet. You have to figure out what is happening in your
code that causes a high memory use and address that. I spent all of last
week doing this for a simple program of my own. Lessons I learned that may
or may not apply to your case:

 - If you don't cache (persist) an RDD, it is not stored. This can save
memory at the cost of possibly repeating computation. (I read around a TB
of files twice, for example, rather than cache them.)
 - Use combineByKey instead of groupByKey if you can process values one by
one. This means they do not need to be all stored.
 - If you have a lot of keys per partition, set mapSideCombine=false for
combineByKey. This avoids creating a large map per partition.
 - If you have a key with a disproportionate number of values (like the
empty string for a missing name), discard it before the computation.
 - Read https://spark.apache.org/docs/latest/tuning.html for more (and more
accurate) information.

Good luck.


On Wed, Apr 23, 2014 at 1:25 AM, jaeholee  wrote:

> Ok. I tried setting the partition number to 128 and numbers greater than
> 128,
> and now I get another error message about "Java heap space". Is it possible
> that there is something wrong with the setup of my Spark cluster to begin
> with? Or is it still an issue with partitioning my data? Or do I just need
> more worker nodes?
>
>
> ERROR TaskSetManager: Task 194.0:14 failed 4 times; aborting job
> org.apache.spark.SparkException: Job aborted: Task 194.0:14 failed 4 times
> (most recent failure: Exception failure: java.lang.OutOfMemoryError: Java
> heap space)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4623.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Hadoop??streaming

2014-04-23 Thread zhxfl
Hello??we know Hadoop-streaming is use for Hadoop to run native program. 
Hadoop-streaming supports  Map and Reduce logic. Reduce logic means Hadoop 
collect all values with same key and give the stream for the native 
application.Spark has PipeRDD too, but PipeRDD doesn't support Reduce logic. So 
it's difficulty for us to transplant our application from Hadoop to Spark.
Anyone can give me advise, thanks!

Re: no response in spark web UI

2014-04-23 Thread wxhsdp
thanks for help
now i'am testing in standalone mode in only one pc. 
i use ./sbin/start-master.sh to start a master and 
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://ubuntu:7077
to connect to the master 

from the web ui, i can see the local worker registered with 2.9GB memory 

 

but when i run whatever applications(for example the SimpleApp.scala in
quick start with input file=4.5KB), it failed with 
14/04/23 16:39:13 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0) 
14/04/23 16:39:13 WARN scheduler.TaskSetManager: Loss was due to
java.lang.OutOfMemoryError 
java.lang.OutOfMemoryError: Java heap space 
at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
 
at
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2378) 
at
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285) 
at
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77) 
at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 
at
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165) 
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) 
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 

i use sbt to run the application 
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled
-XX:MaxPermSize=256M" 
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@" 

what's the problem?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/no-response-in-spark-web-UI-tp4633p4650.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


help

2014-04-23 Thread Joe L
hi, I found out the major problem of my spark cluster but don't know why it
happens. First, I was testing spark by running applications. It was spending
about 20 seconds only for counting 10 million strings/items(2GB) on the
cluster with 8 nodes (8 cores per node). As we know that it is a very bad
performance for a parallel programming framework. Today, I have tested the
same counting program using spark-shell over (this time much larger data)
100 million strings/items (20GB). And, it was taking just 10 seconds to
complete all the tasks and turned out to be almost 10x times faster than its
last result. Its performance was very good and promising. Do you think it is
because of my spark setting?

Joe



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/help-tp4648.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Accesing Hdfs from Spark gives TokenCache error "Can't get Master Kerberos principal for use as renewer"

2014-04-23 Thread Spyros Gasteratos

Hello everyone,
I'm a newbie in both hadoop and spark so please forgive any obvious
mistakes, I'm posting because my google-fu has failed me.

I'm trying to run a test Spark script in order to connect Spark to hadoop.
The script is the following

 from pyspark import SparkContext

 sc = SparkContext("local", "Simple App")
 file = sc.textFile("hdfs://hadoop_node.place:9000/errs.txt")
 errors = file.filter(lambda line: "ERROR" in line)
 errors.count()

When I run it with pyspark I get

py4j.protocol.Py4JJavaError: An error occurred while calling o21.collect. :
java.io.IOException: Can't get Master Kerberos principal for use as renewer
at

org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:116)
at
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodesInternal(TokenCache.java:100)
at
org.apache.hadoop.mapreduce.security.TokenCache.obtainTokensForNamenodes(TokenCache.java:80)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:187)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:251)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at
org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at
org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:46) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) at
org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) at
scala.Option.getOrElse(Option.scala:120) at
org.apache.spark.rdd.RDD.partitions(RDD.scala:205) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:898) at
org.apache.spark.rdd.RDD.collect(RDD.scala:608) at
org.apache.spark.api.java.JavaRDDLike$class.collect(JavaRDDLike.scala:243)
at org.apache.spark.api.java.JavaRDD.collect(JavaRDD.scala:27) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at
py4j.Gateway.invoke(Gateway.java:259) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at
py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:207) at
java.lang.Thread.run(Thread.java:744)

This happens despite the facts that

   - I've done a kinit and a klist shows I have the correct tokens
   - when I issue a ./bin/hadoop fs -ls
hdfs://hadoop_node.place:9000/errs.txt it shows the file
   - Both the local hadoop client and spark have the same configuration file

The core-site.xml in the spark/conf and hadoop/conf folders is the following
(got it from one of the hadoop nodes)




hadoop.security.auth_to_local

RULE:[1:$1](.*@place)s/@place//
RULE:[2:$1/$2@$0](.**/node1.place@place)s/*^([a-zA-Z]*).*/$1/
RULE:[2:$1/$2@$0](.**/node2.place@place)s/*^([a-zA-Z]*).*/$1/
RULE:[2:$1/$2@$0](.**/node3.place@place)s/*^([a-zA-Z]*).*/$1/
RULE:[2:$1/$2@$0](.**/node4.place@place)s/*^([a-zA-Z]*).*/$1/
RULE:[2:$1/$2@$0](.**/node5.place@place)s/*^([a-zA-Z]*).*/$1/
RULE:[2:$1/$2@$0](.**/node6.place@place)s/*^([a-zA-Z]*).*/$1/
RULE:[2:$1/$2@$0](.**/node7.place@place)s/*^([a-zA-Z]*).*/$1/
RULE:[2:nobody]
DEFAULT



net.topology.node.switch.mapping.impl
org.apache.hadoop.net.TableMapping


net.topology.table.file.name
/etc/hadoop/conf/topology.table.file


fs.defaultFS
hdfs://server.place:9000/


  hadoop.security.authentication
  kerberos



  hadoop.security.authorization
  true



  hadoop.proxyuser.hive.hosts
  *



  hadoop.proxyuser.hive.groups
  *




Can someone point out what am I missing?



Re: ERROR TaskSchedulerImpl: Lost an executor

2014-04-23 Thread wxhsdp
i have a similar question
i'am testing in standalone mode in only one pc. 
i use ./sbin/start-master.sh to start a master and
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://ubuntu:7077
to connect to the master

from the web ui, i can see the local worker registered 

 

but when i run whatever applications(for example the SimpleApp.scala in
quick start with input file=4.5KB), it failed with
14/04/23 16:39:13 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/04/23 16:39:13 WARN scheduler.TaskSetManager: Loss was due to
java.lang.OutOfMemoryError
java.lang.OutOfMemoryError: Java heap space
at
org.apache.hadoop.io.WritableUtils.readCompressedStringArray(WritableUtils.java:183)
at 
org.apache.hadoop.conf.Configuration.readFields(Configuration.java:2378)
at 
org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285)
at 
org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77)
at
org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at 
org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:165)
at
org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)

i use sbt to run the application
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled
-XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ERROR-TaskSchedulerImpl-Lost-an-executor-tp4566p4647.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: two calls of saveAsTextFile() have different results on the same RDD

2014-04-23 Thread Cheng Lian
To experiment, try this in the Spark shell:

val r0 = sc.makeRDD(1 to 3, 1)val r1 = r0.map { x =>
  println(x)
  x
}val r2 = r1.map(_ * 2)val r3 = r1.map(_ * 2 + 1)
(r2 ++ r3).collect()

You’ll see elements in r1 are printed (thus evaluated) twice. By adding
.cache() to r1, you’ll see those elements are printed only once.


On Wed, Apr 23, 2014 at 4:35 PM, Cheng Lian  wrote:

> Good question :)
>
> Although RDD DAG is lazy evaluated, it’s not exactly the same as Scala
> lazy val. For Scala lazy val, evaluated value is automatically cached,
> while evaluated RDD elements are not cached unless you call 
> .cache()explicitly, because materializing an RDD can often be expensive. Take 
> local
> file reading as an analogy:
>
> val v0 = sc.textFile("input.log").cache()
>
> is similar to a lazy val
>
> lazy val u0 = Source.fromFile("input.log").mkString
>
> while
>
> val v1 = sc.textFile("input.log")
>
> is similar to a function
>
> def u0 = Source.fromFile("input.log").mkString
>
> Think it this way: if you want to “reuse” the evaluated elements, you have
> to cache those elements somewhere. Without caching, you have to re-evaluate
> the RDD, and the semantics of an uncached RDD simply downgrades to a
> function rather than a lazy val.
>
>
> On Wed, Apr 23, 2014 at 4:00 PM, Mayur Rustagi wrote:
>
>> Shouldnt the dag optimizer optimize these routines. Sorry if its a dumb
>> question :)
>>
>>
>> Mayur Rustagi
>> Ph: +1 (760) 203 3257
>> http://www.sigmoidanalytics.com
>> @mayur_rustagi 
>>
>>
>>
>> On Wed, Apr 23, 2014 at 12:29 PM, Cheng Lian wrote:
>>
>>> Without caching, an RDD will be evaluated multiple times if referenced
>>> multiple times by other RDDs. A silly example:
>>>
>>> val text = sc.textFile("input.log")val r1 = text.filter(_ startsWith 
>>> "ERROR")val r2 = text.map(_ split " ")val r3 = (r1 ++ r2).collect()
>>>
>>> Here the input file will be scanned twice unless you call .cache() on
>>> text. So if your computation involves nondeterminism (e.g. random
>>> number), you may get different results.
>>>
>>>
>>> On Tue, Apr 22, 2014 at 11:30 AM, randylu  wrote:
>>>
 it's ok when i call doc_topic_dist.cache() firstly.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/two-calls-of-saveAsTextFile-have-different-results-on-the-same-RDD-tp4578p4580.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

>>>
>>>
>>
>


Re: Custom KryoSerializer

2014-04-23 Thread Eugen Cepoi
I had a similar need the solution I used is:
 - Define a base implementation of KryoRegistrator (that will register all
common classes/custom ser/deser)
 - make registerClasses method final, so subclasses don't override it
 - Define another method that would be overriden by subclasses that need to
provide additional customization

So if people need custom config, they just extend your base KryoRegistrator
otherwise they directly use the base impl.


2014-04-23 5:17 GMT+02:00 Soren Macbeth :

> Does spark support extending and registering a KryoSerializer class in an
> application jar?
>
> An example of why you might want to do this would be to always register
> some set of common classes within an organization while still allowing the
> particular application jar to use a kryo registrator to register additional
> serializers for it's own uses.
>
> When I try the naive approach of extending KryoSerializer in my
> application jar and registering that class in my SparkConf under the
> "spark.serializer" key, my executors all die complaining they can't find my
> class.
>


Re: two calls of saveAsTextFile() have different results on the same RDD

2014-04-23 Thread Cheng Lian
Good question :)

Although RDD DAG is lazy evaluated, it’s not exactly the same as Scala lazy
val. For Scala lazy val, evaluated value is automatically cached, while
evaluated RDD elements are not cached unless you call .cache() explicitly,
because materializing an RDD can often be expensive. Take local file
reading as an analogy:

val v0 = sc.textFile("input.log").cache()

is similar to a lazy val

lazy val u0 = Source.fromFile("input.log").mkString

while

val v1 = sc.textFile("input.log")

is similar to a function

def u0 = Source.fromFile("input.log").mkString

Think it this way: if you want to “reuse” the evaluated elements, you have
to cache those elements somewhere. Without caching, you have to re-evaluate
the RDD, and the semantics of an uncached RDD simply downgrades to a
function rather than a lazy val.


On Wed, Apr 23, 2014 at 4:00 PM, Mayur Rustagi wrote:

> Shouldnt the dag optimizer optimize these routines. Sorry if its a dumb
> question :)
>
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi 
>
>
>
> On Wed, Apr 23, 2014 at 12:29 PM, Cheng Lian wrote:
>
>> Without caching, an RDD will be evaluated multiple times if referenced
>> multiple times by other RDDs. A silly example:
>>
>> val text = sc.textFile("input.log")val r1 = text.filter(_ startsWith 
>> "ERROR")val r2 = text.map(_ split " ")val r3 = (r1 ++ r2).collect()
>>
>> Here the input file will be scanned twice unless you call .cache() on
>> text. So if your computation involves nondeterminism (e.g. random
>> number), you may get different results.
>>
>>
>> On Tue, Apr 22, 2014 at 11:30 AM, randylu  wrote:
>>
>>> it's ok when i call doc_topic_dist.cache() firstly.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/two-calls-of-saveAsTextFile-have-different-results-on-the-same-RDD-tp4578p4580.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>
>>
>


Re: Spark runs applications in an inconsistent way

2014-04-23 Thread Mayur Rustagi
Very abstract.
EC2 is unlikely culprit.
What are you trying to do. Spark is typically not inconsistent like that
but huge intermediate data, reduce size issues could be involved, but hard
to help without some more detail of what you are trying to achieve.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Tue, Apr 22, 2014 at 7:30 PM, Aureliano Buendia wrote:

> Hi,
>
> Sometimes running the very same spark application binary, behaves
> differently with every execution.
>
>- The Ganglia profile is different with every execution: sometimes it
>takes 0.5 TB of memory, the next time it takes 1 TB of memory, the next
>time it is 0.75 TB...
>- Spark UI shows number of succeeded tasks is more than total number
>of tasks, eg: 3500/3000. There are no failed tasks. At this stage the
>computation keeps carrying on for a long time without returning an answer.
>- The only way to get an answer from an application is to hopelessly
>keep running that application multiple times, until by some luck it gets
>converged.
>
> I was not able to regenerate this by a minimal code, as it seems some
> random factors affect this behavior. I have a suspicion, but I'm not sure,
> that use of one or more groupByKey() calls intensifies this problem.
>
> Another source of suspicion is the unpredicted performance of ec2 clusters
> with latency and io.
>
> Is this a known issue with spark?
>


Re: Need help about how hadoop works.

2014-04-23 Thread Mayur Rustagi
As long as the path is present & available on all machines you should be
able to leverage distribution. HDFS is one way to make that happen, NFS is
another & simple replication is another.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Wed, Apr 23, 2014 at 12:12 PM, Carter  wrote:

> Hi, I am a beginner of Hadoop and Spark, and want some help in
> understanding
> how hadoop works.
>
> If we have a cluster of 5 computers, and install Spark on the cluster
> WITHOUT Hadoop. And then we run the code on one computer:
> val doc = sc.textFile("/home/scalatest.txt",5)
> doc.count
> Can the "count" task be distributed to all the 5 computers? Or it is only
> run by 5 parallel threads of the current computer?
>
> On th other hand, if we install Hadoop on the cluster and upload the data
> into HDFS, when running the same code will this "count" task be done by 25
> threads?
>
> Thank you very much for your help.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Need-help-about-how-hadoop-works-tp4638.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: two calls of saveAsTextFile() have different results on the same RDD

2014-04-23 Thread Mayur Rustagi
Shouldnt the dag optimizer optimize these routines. Sorry if its a dumb
question :)


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Wed, Apr 23, 2014 at 12:29 PM, Cheng Lian  wrote:

> Without caching, an RDD will be evaluated multiple times if referenced
> multiple times by other RDDs. A silly example:
>
> val text = sc.textFile("input.log")val r1 = text.filter(_ startsWith 
> "ERROR")val r2 = text.map(_ split " ")val r3 = (r1 ++ r2).collect()
>
> Here the input file will be scanned twice unless you call .cache() on text.
> So if your computation involves nondeterminism (e.g. random number), you
> may get different results.
>
>
> On Tue, Apr 22, 2014 at 11:30 AM, randylu  wrote:
>
>> it's ok when i call doc_topic_dist.cache() firstly.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/two-calls-of-saveAsTextFile-have-different-results-on-the-same-RDD-tp4578p4580.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: two calls of saveAsTextFile() have different results on the same RDD

2014-04-23 Thread Cheng Lian
Without caching, an RDD will be evaluated multiple times if referenced
multiple times by other RDDs. A silly example:

val text = sc.textFile("input.log")val r1 = text.filter(_ startsWith
"ERROR")val r2 = text.map(_ split " ")val r3 = (r1 ++ r2).collect()

Here the input file will be scanned twice unless you call .cache() on text.
So if your computation involves nondeterminism (e.g. random number), you
may get different results.


On Tue, Apr 22, 2014 at 11:30 AM, randylu  wrote:

> it's ok when i call doc_topic_dist.cache() firstly.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/two-calls-of-saveAsTextFile-have-different-results-on-the-same-RDD-tp4578p4580.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>