Re: sparkR - is it possible to run sparkR on yarn?

2014-04-30 Thread Shivaram Venkataraman
We don't have any documentation on running SparkR on YARN and I think there
might be some issues that need to be fixed (The recent PySpark on YARN PRs
are an example).
SparkR has only been tested to work with Spark standalone mode so far.

Thanks
Shivaram



On Tue, Apr 29, 2014 at 7:56 PM, phoenix bai mingzhi...@gmail.com wrote:

 Hi all,

 I searched around, but fail to find anything that says about running
 sparkR on YARN.

 so, is it possible to run sparkR with yarn ? either with yarn-standalone
 or yarn-client mode.
 if so, is there any document that could guide me through the build  setup
 processes?

 I am desparate for some answers, so please help!



Setting spark.locality.wait.node parameter in interactive shell

2014-04-30 Thread Sai Prasanna
Hi, Any suggestion to the following issue ??

I have replication factor 3 in my HDFS.
With 3 datanodes, i ran my experiments. Now i just added another node to it
with no data in it.
When i ran, SPARK launches non-local tasks in it and the time taken is more
than what it took for 3 node cluster.

Here delayed scheduling fails i think because of the parameter
spark.locality.wait.node which is by default 3 sec. It launches ANY level
tasks in the added data node.

*How to set the spark.locality.wait.node parameter in the env for
interactive shell sc.*

Thanks !


RE: How fast would you expect shuffle serialize to be?

2014-04-30 Thread Liu, Raymond
I just tried to use serializer to write object directly in local mode with code:

val datasize =  args(1).toInt
val dataset = (0 until datasize).map( i = (asmallstring, i))

val out: OutputStream = {
new BufferedOutputStream(new FileOutputStream(args(2)), 1024 * 100)
  }

val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)

dataset.foreach( value =
  serOut.writeObject(value)
)
serOut.flush()
serOut.close()

Thus one core on one disk. When using javaserializer, throughput is 10~12MB/s, 
and kryo doubles. So it seems to me that when running the full path code in my 
previous case, 32 core with 50MB/s total throughput are reasonable?


Best Regards,
Raymond Liu


-Original Message-
From: Liu, Raymond [mailto:raymond@intel.com] 


Later case, total throughput aggregated from all cores.

Best Regards,
Raymond Liu


-Original Message-
From: Patrick Wendell [mailto:pwend...@gmail.com]
Sent: Wednesday, April 30, 2014 1:22 PM
To: user@spark.apache.org
Subject: Re: How fast would you expect shuffle serialize to be?

Hm - I'm still not sure if you mean
100MB/s for each task = 3200MB/s across all cores
-or-
3.1MB/s for each task = 100MB/s across all cores

If it's the second one, that's really slow and something is wrong. If it's the 
first one this in the range of what I'd expect, but I'm no expert.

On Tue, Apr 29, 2014 at 10:14 PM, Liu, Raymond raymond@intel.com wrote:
 For all the tasks, say 32 task on total

 Best Regards,
 Raymond Liu


 -Original Message-
 From: Patrick Wendell [mailto:pwend...@gmail.com]

 Is this the serialization throughput per task or the serialization throughput 
 for all the tasks?

 On Tue, Apr 29, 2014 at 9:34 PM, Liu, Raymond raymond@intel.com wrote:
 Hi

 I am running a WordCount program which count words from HDFS, 
 and I noticed that the serializer part of code takes a lot of CPU 
 time. On a 16core/32thread node, the total throughput is around 
 50MB/s by JavaSerializer, and if I switching to KryoSerializer, it 
 doubles to around 100-150MB/s. ( I have 12 disks per node and files 
 scatter across disks, so HDFS BW is not a problem)

 And I also notice that, in this case, the object to write is 
 (String, Int), if I try some case with (int, int), the throughput will be 
 2-3x faster further.

 So, in my Wordcount case, the bottleneck is CPU ( cause if 
 with shuffle compress on, the 150MB/s data bandwidth in input side, 
 will usually lead to around 50MB/s shuffle data)

 This serialize BW looks somehow too low , so I am wondering, what's 
 BW you observe in your case? Does this throughput sounds reasonable to you? 
 If not, anything might possible need to be examined in my case?



 Best Regards,
 Raymond Liu




Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Yes, that’s what I meant. Sure, the numbers might not be actually sorted,
but the order of rows semantically are kept throughout non-shuffling
transforms. I’m on board with you on union as well.

Back to the original question, then, why is it important to coalesce to a
single partition? When you union two RDDs, for example, rdd1 = [“a, b,
c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with three
lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from the
two reds are concatenated.

Mingyu




On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:

If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
 Thanks for the quick response!

 To better understand it, the reason sorted RDD has a well-defined
ordering
 is because sortedRDD.getPartitions() returns the partitions in the right
 order and each partition internally is properly sorted. So, if you have

 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]

 Since mapValues doesn’t change the order of partitions not change the
 order of rows within the partitions, I think “mapped” should have the
 exact same order as “sorted”. Sure, if a transform involves shuffling,
the
 order will change. Am I mistaken? Is there an extra detail in sortedRDD
 that guarantees a well-defined ordering?

 If it’s true that the order of partitions returned by
RDD.getPartitions()
 and the row orders within the partitions determine the row order, I’m
not
 sure why union doesn’t respect the order because union operation simply
 concatenates the two lists of partitions from the two RDDs.

 Mingyu




 On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:

You are right, once you sort() the RDD, then yes it has a well defined
ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered. As
far
 as I know, RDDs keep list of partitions that are ordered and this is
why I
 can call RDD.take() and get the same first k rows every time I call it
and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to get
the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk
that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition,
then
 call MapPartitions and return an iterator that first adds your
header
 and then the rest of the file, then call saveAsTextFile. Keep in
mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ it).iterator)
 .saveAsTextFile(out.csv)

 - Patrick

 On Wed, Jan 22, 2014 at 11:12 AM, Aureliano Buendia
 buendia...@gmail.com wrote:
  Hi,
 
  I'm trying to find a way to create a csv header when using
  saveAsTextFile,
  and I came up with this:
 
  (sc.makeRDD(Array(col1,col2,col3), 1) ++
  myRdd.coalesce(1).map(_.mkString(,)))
.saveAsTextFile(out.csv)
 
  But it only saves the header part. Why is that the union method
does
not
  return both RDD's?




smime.p7s
Description: S/MIME cryptographic signature


Re: NoSuchMethodError from Spark Java

2014-04-30 Thread wxhsdp
i fixed it. 

i make my sbt project depend on
spark/trunk/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar
and it works



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5096.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Shuffle Spill Issue

2014-04-30 Thread Daniel Darabos
Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey
just calls combineByKey:

def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] =
{
  combineByKey[V]((v: V) = v, func, func, partitioner)
}

(I think I confused reduceByKey with groupByKey.)


On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond raymond@intel.com wrote:

 Hi Daniel

 Thanks for your reply, While I think for reduceByKey, it will also
 do map side combine, thus extra the result is the same, say, for each
 partition, one entry per distinct word. In my case with javaserializer,
  240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill (
 memory ) is abnormal, and sounds to me should not trigger at all. And, by
 the way, this behavior only occurs in map out side, on reduce / shuffle
 fetch side, this strange behavior won't happen.

 Best Regards,
 Raymond Liu

 From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]

 I have no idea why shuffle spill is so large. But this might make it
 smaller:

 val addition = (a: Int, b: Int) = a + b
 val wordsCount = wordsPair.combineByKey(identity, addition, addition)

 This way only one entry per distinct word will end up in the shuffle for
 each partition, instead of one entry per word occurrence.

 On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond raymond@intel.com
 wrote:
 Hi  Patrick

 I am just doing simple word count , the data is generated by
 hadoop random text writer.

 This seems to me not quite related to compress , If I turn off
 compress on shuffle, the metrics is something like below for the smaller
 240MB Dataset.


 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB
  2.2 GB  1291.2 KB
 12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB
  1822.6 MB   1073.3 KB
 13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB
  1099.2 MB   621.2 KB
 15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB
  1898.8 MB   1072.6 KB
 16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB
  1638.0 MB   1044.6 KB


 And the program pretty simple:

 val files = sc.textFile(args(1))
 val words = files.flatMap(_.split( ))
 val wordsPair = words.map(x = (x, 1))

 val wordsCount = wordsPair.reduceByKey(_ + _)
 val count = wordsCount.count()

 println(Number of words =  + count)


 Best Regards,
 Raymond Liu

 From: Patrick Wendell [mailto:pwend...@gmail.com]

 Could you explain more what your job is doing and what data types you are
 using? These numbers alone don't necessarily indicate something is wrong.
 The relationship between the in-memory and on-disk shuffle amount is
 definitely a bit strange, the data gets compressed when written to disk,
 but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
 to compress _that_ much.

 On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com
 wrote:
 Hi


 I am running a simple word count program on spark standalone
 cluster. The cluster is made up of 6 node, each run 4 worker and each
 worker own 10G memory and 16 core thus total 96 core and 240G memory. (
 well, also used to configed as 1 worker with 40G memory on each node )

 I run a very small data set (2.4GB on HDFS on total) to confirm
 the problem here as below:

 As you can read from part of the task metrics as below, I noticed
 that the shuffle spill part of metrics indicate that there are something
 wrong.

 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB
  23.6 GB 4.3 MB
 1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB
  19.0 GB 3.4 MB
 10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB
  25.6 GB 4.6 MB
 11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB
  25.0 GB 4.4 MB
 12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB
  25.1 GB 4.4 MB


 You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
 of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
 me that by no means that the spill should trigger, since the memory is not
 used up at all.

 To verify that I further reduce the data size to 240MB on total

 And here is the result:


 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
  80.0 MB 43.2 KB
 1   sr433:50207 17 s4   0   4   0.0 B   704.7 KB
  389.5 MB90.2 KB
 

the spark configuage

2014-04-30 Thread Sophia
Hi,
when I configue spark, run the shell instruction:
./spark-shellit told me like this:
WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
builtin-java classes where applicable,when it connect to ResourceManager,it
stopped. What should I DO?
Wish your reply



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Joining not-pair RDDs in Spark

2014-04-30 Thread jsantos
That's the approach I finally used.
Thanks for your help :-)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joining-not-pair-RDDs-in-Spark-tp5034p5099.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: the spark configuage

2014-04-30 Thread Akhil Das
Hi

The reason you saw that warning is the native Hadoop library
$HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit.

Anyway, it's just a warning, and won't impact Hadoop's functionalities.

Here is the way if you do want to eliminate this warning, download the
source code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system,
then replace the 32bit one.

Steps on how to recompile source code are included here for Ubuntu:

http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm
http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/

Good luck.


On Wed, Apr 30, 2014 at 1:28 PM, Sophia sln-1...@163.com wrote:

 Hi,
 when I configue spark, run the shell instruction:
 ./spark-shellit told me like this:
 WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
 builtin-java classes where applicable,when it connect to ResourceManager,it
 stopped. What should I DO?
 Wish your reply



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: the spark configuage

2014-04-30 Thread Andras Nemeth
On 30 Apr 2014 10:35, Akhil Das ak...@sigmoidanalytics.com wrote:

 Hi

 The reason you saw that warning is the native Hadoop library
$HADOOP_HOME/lib/native/libhadoop.so.1.0.0 was actually compiled on 32 bit.

 Anyway, it's just a warning, and won't impact Hadoop's functionalities.

 Here is the way if you do want to eliminate this warning, download the
source code of Hadoop and recompile libhadoop.so.1.0.0 on 64bit system,
then replace the 32bit one.

 Steps on how to recompile source code are included here for Ubuntu:


http://www.ercoppa.org/Linux-Compile-Hadoop-220-fix-Unable-to-load-native-hadoop-library.htm

http://www.csrdu.org/nauman/2014/01/23/geting-started-with-hadoop-2-2-0-building/

 Good luck.


 On Wed, Apr 30, 2014 at 1:28 PM, Sophia sln-1...@163.com wrote:

 Hi,
 when I configue spark, run the shell instruction:
 ./spark-shellit told me like this:
 WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
 builtin-java classes where applicable,when it connect to
ResourceManager,it
 stopped. What should I DO?
 Wish your reply



 --
 View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.




new Washington DC Area Spark Meetup

2014-04-30 Thread Donna-M. Fernandez
Hi, all!  For those in the Washington DC area (DC/MD/VA), we just started a
new Spark Meetup.  We'd love for you to join!  -d

Here's the link: http://www.meetup.com/Washington-DC-Area-Spark-Interactive/


Description:

This is an interactive meetup for Washington DC, Virginia and Maryland
users, enthusiasts, and explorers of Apache Spark (www.spark-project.org).
Spark is the powerful open source Data processing framework that extends and
accelerates Hadoop; built around speed, ease of use, and sophisticated
analytics. This is a very interactive meetup where we will exchange ideas,
inspire and learn from each other, and bring the top minds and innovators
around big data and real-time solutions to the table. This meetup is meant
for both the business and technical community (both the curious and the
experts) interested in the how and the why of Sparks' capabilities. The
meetup will include introductions to the various Spark features and
functions, case studies from current users, best practices for deployment,
and speakers from the top technology leaders and vendors who are helping
craft Spark's roadmap for the future. Let's build this exciting Spark
community together!


Thanks,
Donna-M. Fernandez | VP of Operations  Services | MetiStream, Inc. |
do...@metistream.com | 202.642.3220 





attachment: winmail.dat

Re: the spark configuage

2014-04-30 Thread Diana Carroll
I'm guessing your shell stopping when it attempts to connect to the RM is
not related to that warning.  You'll get that message out of the box from
Spark if you don't have HADOOP_HOME set correctly.  I'm using CDH 5.0
installed in default locations, and got rid of the warning by setting
HADOOP_HOME to /usr/lib/hadoop.  The stopping issue might be something
unrelated.

Diana


On Wed, Apr 30, 2014 at 3:58 AM, Sophia sln-1...@163.com wrote:

 Hi,
 when I configue spark, run the shell instruction:
 ./spark-shellit told me like this:
 WARN:NativeCodeLoader:Uable to load native-hadoop livrary for your
 builtin-java classes where applicable,when it connect to ResourceManager,it
 stopped. What should I DO?
 Wish your reply



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/the-spark-configuage-tp5098.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
Okay, that makes sense. It’d be great if this can be better documented at
some point, because the only way to find out about the resulting RDD row
order is by looking at the code.

Thanks for the discussion!

Mingyu




On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote:

I don't think we guarantee anywhere that union(A, B) will behave by
concatenating the partitions, it just happens to be an artifact of the
current implementation.

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
wouldn't violate the contract of union

AFIAK the only guarentee is the resulting RDD will contain all elements.

- Patrick

On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote:
 Yes, that’s what I meant. Sure, the numbers might not be actually
sorted,
 but the order of rows semantically are kept throughout non-shuffling
 transforms. I’m on board with you on union as well.

 Back to the original question, then, why is it important to coalesce to
a
 single partition? When you union two RDDs, for example, rdd1 = [“a, b,
 c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
 rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
three
 lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
the
 two reds are concatenated.

 Mingyu




 On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:

If you call map() on an RDD it will retain the ordering it had before,
but that is not necessarily a correct sort order for the new RDD.

var rdd = sc.parallelize([2, 1, 3]);
var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]

Note that mapped is no longer sorted.

When you union two RDD's together it will effectively concatenate the
two orderings, which is also not a valid sorted order on the new RDD:

rdd1 = [1,2,3]
rdd2 = [1,4,5]

rdd1.union(rdd2) = [1,2,3,1,4,5]

On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
 Thanks for the quick response!

 To better understand it, the reason sorted RDD has a well-defined
ordering
 is because sortedRDD.getPartitions() returns the partitions in the
right
 order and each partition internally is properly sorted. So, if you
have

 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]

 Since mapValues doesn’t change the order of partitions not change the
 order of rows within the partitions, I think “mapped” should have the
 exact same order as “sorted”. Sure, if a transform involves shuffling,
the
 order will change. Am I mistaken? Is there an extra detail in
sortedRDD
 that guarantees a well-defined ordering?

 If it’s true that the order of partitions returned by
RDD.getPartitions()
 and the row orders within the partitions determine the row order, I’m
not
 sure why union doesn’t respect the order because union operation
simply
 concatenates the two lists of partitions from the two RDDs.

 Mingyu




 On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:

You are right, once you sort() the RDD, then yes it has a well defined
ordering.

But that ordering is lost as soon as you transform the RDD, including
if you union it with another RDD.

On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com
wrote:
 Hi Patrick,

 I¹m a little confused about your comment that RDDs are not ordered.
As
far
 as I know, RDDs keep list of partitions that are ordered and this is
why I
 can call RDD.take() and get the same first k rows every time I call
it
and
 RDD.take() returns the same entries as RDD.map(Š).take() because map
 preserves the partition order. RDD order is also what allows me to
get
the
 top k out of RDD by doing RDD.sort().take().

 Am I misunderstanding it? Or, is it just when RDD is written to disk
that
 the order is not well preserved? Thanks in advance!

 Mingyu




 On 1/22/14, 4:46 PM, Patrick Wendell pwend...@gmail.com wrote:

Ah somehow after all this time I've never seen that!

On Wed, Jan 22, 2014 at 4:45 PM, Aureliano Buendia
buendia...@gmail.com
wrote:



 On Thu, Jan 23, 2014 at 12:37 AM, Patrick Wendell
pwend...@gmail.com
 wrote:

 What is the ++ operator here? Is this something you defined?


 No, it's an alias for union defined in RDD.scala:

 def ++(other: RDD[T]): RDD[T] = this.union(other)



 Another issue is that RDD's are not ordered, so when you union
two
 together it doesn't have a well defined ordering.

 If you do want to do this you could coalesce into one partition,
then
 call MapPartitions and return an iterator that first adds your
header
 and then the rest of the file, then call saveAsTextFile. Keep in
mind
 this will only work if you coalesce into a single partition.


 Thanks! I'll give this a try.



 myRdd.coalesce(1)
 .map(_.mkString(,)))
 .mapPartitions(it = (Seq(col1,col2,col3) ++ 

Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Hi

Playing around with Spark  S3, I'm opening multiple objects (CSV files) with:

    val hfile = sc.textFile(s3n://bucket/2014-04-28/)

so hfile is a RDD representing 10 objects that were underneath 2014-04-28. 
After I've sorted and otherwise transformed the content, I'm trying to write it 
back to a single object:

    
sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)

unfortunately this results in a folder named concatted.csv with 10 objects 
underneath, part-0 .. part-00010, corresponding to the 10 original objects 
loaded. 

How can I achieve the desired behaviour of putting a single object named 
concatted.csv ?

I've tried 0.9.1 and 1.0.0-RC3. 

Thanks!
Peter

Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Ah, looks like RDD.coalesce(1) solves one part of the problem.
On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote:
 
Hi

Playing around with Spark  S3, I'm opening multiple objects (CSV files) with:

    val hfile = sc.textFile(s3n://bucket/2014-04-28/)

so hfile is a RDD representing 10 objects that were underneath 2014-04-28. 
After I've sorted and otherwise transformed the content, I'm trying to write it 
back to a single object:

    
sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)

unfortunately this results in a folder named concatted.csv with 10 objects 
underneath, part-0 .. part-00010, corresponding to the 10 original objects 
loaded. 

How can I achieve the desired behaviour of putting a single object named 
concatted.csv ?

I've tried 0.9.1 and 1.0.0-RC3. 

Thanks!
Peter

Re: Union of 2 RDD's only returns the first one

2014-04-30 Thread Mingyu Kim
I agree with you in general that as an API user, I shouldn’t be relying on
code. However, without looking at the code, there is no way for me to find
out even whether map() keeps the row order. Without the knowledge at all,
I’d need to do “sort” every time I need certain things in a certain order.
(and, sort is really expensive.) On the other hand, if I can assume, say,
“filter” or “map” doesn’t shuffle the rows around, I can do the sort once
and assume that the order is retained throughout such operations saving a
lot of time from doing unnecessary sorts.

Mingyu

From:  Mark Hamstra m...@clearstorydata.com
Reply-To:  user@spark.apache.org user@spark.apache.org
Date:  Wednesday, April 30, 2014 at 11:36 AM
To:  user@spark.apache.org user@spark.apache.org
Subject:  Re: Union of 2 RDD's only returns the first one

Which is what you shouldn't be doing as an API user, since that
implementation code might change.  The documentation doesn't mention a row
ordering guarantee, so none should be assumed.

It is hard enough for us to correctly document all of the things that the
API does do.  We really shouldn't be forced into the expectation that we
will also fully document everything that the API doesn't do.


On Wed, Apr 30, 2014 at 11:13 AM, Mingyu Kim m...@palantir.com wrote:
 Okay, that makes sense. It’d be great if this can be better documented at
 some point, because the only way to find out about the resulting RDD row
 order is by looking at the code.
 
 Thanks for the discussion!
 
 Mingyu
 
 
 
 
 On 4/29/14, 11:59 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 I don't think we guarantee anywhere that union(A, B) will behave by
 concatenating the partitions, it just happens to be an artifact of the
 current implementation.
 
 rdd1 = [1,2,3]
 rdd2 = [1,4,5]
 
 rdd1.union(rdd2) = [1,2,3,1,4,5] // how it is now
 rdd1.union(rdd2) = [1,4,5,1,2,3] // some day it could be like this, it
 wouldn't violate the contract of union
 
 AFIAK the only guarentee is the resulting RDD will contain all elements.
 
 - Patrick
 
 On Tue, Apr 29, 2014 at 11:26 PM, Mingyu Kim m...@palantir.com wrote:
  Yes, that’s what I meant. Sure, the numbers might not be actually
 sorted,
  but the order of rows semantically are kept throughout non-shuffling
  transforms. I’m on board with you on union as well.
 
  Back to the original question, then, why is it important to coalesce to
 a
  single partition? When you union two RDDs, for example, rdd1 = [“a, b,
  c”], rdd2 = [“1, 2, 3”, “4, 5, 6”], then
  rdd1.union(rdd2).saveAsTextFile(…) should’ve resulted in a file with
 three
  lines “a, b, c”, “1, 2, 3”, and “4, 5, 6” because the partitions from
 the
  two reds are concatenated.
 
  Mingyu
 
 
 
 
  On 4/29/14, 10:55 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 If you call map() on an RDD it will retain the ordering it had before,
 but that is not necessarily a correct sort order for the new RDD.
 
 var rdd = sc.parallelize([2, 1, 3]);
 var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
 var mapped = sorted.mapValues(x = 3 - x); // should be [2, 1, 0]
 
 Note that mapped is no longer sorted.
 
 When you union two RDD's together it will effectively concatenate the
 two orderings, which is also not a valid sorted order on the new RDD:
 
 rdd1 = [1,2,3]
 rdd2 = [1,4,5]
 
 rdd1.union(rdd2) = [1,2,3,1,4,5]
 
 On Tue, Apr 29, 2014 at 10:44 PM, Mingyu Kim m...@palantir.com wrote:
  Thanks for the quick response!
 
  To better understand it, the reason sorted RDD has a well-defined
 ordering
  is because sortedRDD.getPartitions() returns the partitions in the
 right
  order and each partition internally is properly sorted. So, if you
 have
 
  var rdd = sc.parallelize([2, 1, 3]);
  var sorted = rdd.map(x = (x, x)).sort(); // should be [1, 2, 3]
  var mapped = sorted.mapValues(x = x + 1); // should be [2, 3, 4]
 
  Since mapValues doesn’t change the order of partitions not change the
  order of rows within the partitions, I think “mapped” should have the
  exact same order as “sorted”. Sure, if a transform involves
 shuffling,
 the
  order will change. Am I mistaken? Is there an extra detail in
 sortedRDD
  that guarantees a well-defined ordering?
 
  If it’s true that the order of partitions returned by
 RDD.getPartitions()
  and the row orders within the partitions determine the row order, I’m
 not
  sure why union doesn’t respect the order because union operation
 simply
  concatenates the two lists of partitions from the two RDDs.
 
  Mingyu
 
 
 
 
  On 4/29/14, 10:25 PM, Patrick Wendell pwend...@gmail.com wrote:
 
 You are right, once you sort() the RDD, then yes it has a well
 defined
 ordering.
 
 But that ordering is lost as soon as you transform the RDD,
 including
 if you union it with another RDD.
 
 On Tue, Apr 29, 2014 at 10:22 PM, Mingyu Kim m...@palantir.com
 wrote:
  Hi Patrick,
 
  I¹m a little confused about your comment that RDDs are not
 ordered.
 As
 far
  as I know, RDDs keep list of partitions that are 

Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Nicholas Chammas
Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
coalesce(1), you move everything in the RDD to a single partition, which
then gives you 1 output file.

It will still be called part-0 or something like that because that’s
defined by the Hadoop API that Spark uses for reading to/writing from S3. I
don’t know of a way to change that.


On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:

 Ah, looks like RDD.coalesce(1) solves one part of the problem.
   On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com
 wrote:
  Hi

 Playing around with Spark  S3, I'm opening multiple objects (CSV files)
 with:

 val hfile = sc.textFile(s3n://bucket/2014-04-28/)

 so hfile is a RDD representing 10 objects that were underneath
 2014-04-28. After I've sorted and otherwise transformed the content, I'm
 trying to write it back to a single object:


 sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)

 unfortunately this results in a folder named concatted.csv with 10
 objects underneath, part-0 .. part-00010, corresponding to the 10
 original objects loaded.

 How can I achieve the desired behaviour of putting a single object named
 concatted.csv ?

 I've tried 0.9.1 and 1.0.0-RC3.

 Thanks!
 Peter







Re: What is Seq[V] in updateStateByKey?

2014-04-30 Thread Sean Owen
S is the previous count, if any. Seq[V] are potentially many new
counts. All of them have to be added together to keep an accurate
total.  It's as if the count were 3, and I tell you I've just observed
2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
1 + 1.


I butted in since I'd like to ask a different question about the same
line of code. Why:

  val currentCount = values.foldLeft(0)(_ + _)

instead of

  val currentCount = values.sum

This happens a few places in the code. sum seems equivalent and likely
quicker. Same with things like filter(_ == 200).size instead of
count(_ == 200)... pretty trivial but hey.


On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu
amoc...@verticalscope.com wrote:
 Hi TD,

 Why does the example keep recalculating the count via fold?

 Wouldn’t it make more sense to get the last count in values Seq and add 1 to
 it and save that as current count?



 From what Sean explained I understand that all values in Seq have the same
 key. Then when a new value for that key is found it is added to this Seq
 collection and the update function is called.



 Is my understanding correct?


Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Peter
Thanks Nicholas, this is a bit of a shame, not very practical for log roll up 
for example when every output needs to be in it's own directory. 
On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:
 
Yes, saveAsTextFile() will give you 1 part per RDD partition. When you 
coalesce(1), you move everything in the RDD to a single partition, which then 
gives you 1 output file. 
It will still be called part-0 or something like that because that’s 
defined by the Hadoop API that Spark uses for reading to/writing from S3. I 
don’t know of a way to change that.



On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:

Ah, looks like RDD.coalesce(1) solves one part of the problem.
On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com wrote:
 
Hi


Playing around with Spark  S3, I'm opening multiple objects (CSV files) with:


    val hfile = sc.textFile(s3n://bucket/2014-04-28/)


so hfile is a RDD representing 10 objects that were underneath 2014-04-28. 
After I've sorted and otherwise transformed the content, I'm trying to write 
it back to a single object:


    
sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)


unfortunately this results in a folder named concatted.csv with 10 objects 
underneath, part-0 .. part-00010, corresponding to the 10 original objects 
loaded. 


How can I achieve the desired behaviour of putting a single object named 
concatted.csv ?


I've tried 0.9.1 and 1.0.0-RC3. 


Thanks!
Peter







Re: What is Seq[V] in updateStateByKey?

2014-04-30 Thread Tathagata Das
Yeah, I remember changing fold to sum in a few places, probably in
testsuites, but missed this example I guess.



On Wed, Apr 30, 2014 at 1:29 PM, Sean Owen so...@cloudera.com wrote:

 S is the previous count, if any. Seq[V] are potentially many new
 counts. All of them have to be added together to keep an accurate
 total.  It's as if the count were 3, and I tell you I've just observed
 2, 5, and 1 additional occurrences -- the new count is 3 + (2+5+1) not
 1 + 1.


 I butted in since I'd like to ask a different question about the same
 line of code. Why:

   val currentCount = values.foldLeft(0)(_ + _)

 instead of

   val currentCount = values.sum

 This happens a few places in the code. sum seems equivalent and likely
 quicker. Same with things like filter(_ == 200).size instead of
 count(_ == 200)... pretty trivial but hey.


 On Wed, Apr 30, 2014 at 9:23 PM, Adrian Mocanu
 amoc...@verticalscope.com wrote:
  Hi TD,
 
  Why does the example keep recalculating the count via fold?
 
  Wouldn’t it make more sense to get the last count in values Seq and add
 1 to
  it and save that as current count?
 
 
 
  From what Sean explained I understand that all values in Seq have the
 same
  key. Then when a new value for that key is found it is added to this Seq
  collection and the update function is called.
 
 
 
  Is my understanding correct?



Re: NoSuchMethodError from Spark Java

2014-04-30 Thread Marcelo Vanzin
Hi,

One thing you can do is set the spark version your project depends on
to 1.0.0-SNAPSHOT (make sure it matches the version of Spark you're
building); then before building your project, run sbt publishLocal
on the Spark tree.

On Wed, Apr 30, 2014 at 12:11 AM, wxhsdp wxh...@gmail.com wrote:
 i fixed it.

 i make my sbt project depend on
 spark/trunk/assembly/target/scala-2.10/spark-assembly-1.0.0-SNAPSHOT-hadoop1.0.4.jar
 and it works



 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/NoSuchMethodError-from-Spark-Java-tp4937p5096.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.



-- 
Marcelo


[ANN]: Scala By The Bay Conference ( aka Silicon Valley Scala Symposium)

2014-04-30 Thread Chester Chen
Hi,  
     This is not related to Spark. But I thought you might be interested in the 
second SF Scala conference is coming this August. The SF Scala conference was 
called Sillicon Valley Scala Symposium last year.  From now on, it will be 
known as Scala By The Bay. 

http://www.scalabythebay.org

-- watch that space for announcements and the CFP!

Chester

My talk on Spark: The Next Top (Compute) Model

2014-04-30 Thread Dean Wampler
I meant to post this last week, but this is a talk I gave at the Philly ETE
conf. last week:

http://www.slideshare.net/deanwampler/spark-the-next-top-compute-model

Also here:

http://polyglotprogramming.com/papers/Spark-TheNextTopComputeModel.pdf

dean

-- 
Dean Wampler, Ph.D.
Typesafe
@deanwampler
http://typesafe.com
http://polyglotprogramming.com


Re: Any advice for using big spark.cleaner.delay value in Spark Streaming?

2014-04-30 Thread buremba
Thanks for your reply. Sorry for the late response, I wanted to do some tests
before writing back.

The counting part works similar to your advice. I specify a minimum interval
like 1 minute, in each hour, day etc. it sums all counters of the current
children intervals.

However when I want to count unique visitors of the month things get much
more complex. I need to merge 30 sets which contains visitor id's and each
of them has more than a few hundred thousands of elements. Merging sets may
be still the best option rather than keeping another Set for last month
though, however I'm not sure because when there are many intersections it
may be inefficient.

BTW, I have one more question. The HLL example in repository seems confusing
to me. How Spark handles global variable usages in mapPartitions method?
(https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala#L68)
 
I'm also a newbie but I thought the map and mapPartitions methods are
similar to Hadoop's map methods so when we run the example on a cluster how
an external node reaches a global variable in a single node? Does Spark
replicate HyperLogLogMonoid instances across the cluster?

Thanks,
Burak Emre



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Any-advice-for-using-big-spark-cleaner-delay-value-in-Spark-Streaming-tp4895p5131.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


update of RDDs

2014-04-30 Thread narayanabhatla NarasimhaMurthy
In our application, we need distributed RDDs containing key-value maps. We
have operations that update RDDs by way of adding entries to the map, delete
entries from the map as well as update value part of maps.
We also have map reduce functions that operate on the RDDs.The questions are
the following.
1. Can RDDs be updated? if Yes, what rae the methods? 
2. If we update RDDs, will it happen in place or does it create new RDDs
with almost double the original RDD size (original+newly created RDD)?
Thank you very much.
N.N.Murthy



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/update-of-RDDs-tp5132.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Strange lookup behavior. Possible bug?

2014-04-30 Thread Yadid Ayzenberg

Dear Sparkers,

Has anyone got any insight on this ? I am really stuck.

Yadid


On 4/28/14, 11:28 AM, Yadid Ayzenberg wrote:

Thanks for your answer.
I tried running on a single machine - master and worker on one host. I 
get exactly the same results.
Very little CPU activity on the machine in question. The web UI shows 
a single task and its state is RUNNING. it will remain so indefinitely.

I have a single partition, and its size is 1626.2 MB

Currently the RDD has 200 elements, but I have tried it with 20 and 
the behavior is the same.

The key is of the form:  (0,52fb9aff3004f07d1a87c8ea)
Where the first number in the tuple is always 0, and the second one is 
some string that can appear more than once.


The RDD is created by using the newAPIHadoopRDD.

Any additional info I can provide?

Yadid




On 4/28/14 10:46 AM, Daniel Darabos wrote:
That is quite mysterious, and I do not think we have enough 
information to answer. JavaPairRDDString, Tuple2.lookup() works 
fine on a remote Spark cluster:


$ MASTER=spark://localhost:7077 bin/spark-shell
scala val rdd = 
org.apache.spark.api.java.JavaPairRDD.fromRDD(sc.makeRDD(0 until 10, 
3).map(x = ((x%3).toString, (x, x%3

scala rdd.lookup(1)
res0: java.util.List[(Int, Int)] = [(1,1), (4,1), (7,1)]

You suggest maybe the driver does not receive a message from an 
executor. I guess it is likely possible, though it has not happened 
to me. I would recommend running on a single machine in the 
standalone setup. Start the master and worker on the same machine, 
run the application there too. This should eliminate network 
configuration problems.


If you still see the issue, I'd check whether the task has really 
completed. What do you see on the web UI? Is the executor using CPU?


Good luck.




On Mon, Apr 28, 2014 at 2:35 AM, Yadid Ayzenberg ya...@media.mit.edu 
mailto:ya...@media.mit.edu wrote:


Can someone please suggest how I can move forward with this?
My spark version is 0.9.1.
The big challenge is that this issue is not recreated when
running in local mode. What could be the difference?

I would really appreciate any pointers, as currently the the job
just hangs.




On 4/25/14, 7:37 PM, Yadid Ayzenberg wrote:

Some additional information - maybe this rings a bell with
someone:

I suspect this happens when the lookup returns more than one
value.
For 0 and 1 values, the function behaves as you would expect.

Anyone ?



On 4/25/14, 1:55 PM, Yadid Ayzenberg wrote:

Hi All,

Im running a lookup on a JavaPairRDDString, Tuple2.
When running on local machine - the lookup is
successfull. However, when running a standalone cluster
with the exact same dataset - one of the tasks never ends
(constantly in RUNNING status).
When viewing the worker log, it seems that the task has
finished successfully:

14/04/25 13:40:38 INFO BlockManager: Found block rdd_2_0
locally
14/04/25 13:40:38 INFO Executor: Serialized size of
result for 2 is 10896794
14/04/25 13:40:38 INFO Executor: Sending result for 2
directly to driver
14/04/25 13:40:38 INFO Executor: Finished task ID 2

But it seems the driver is not aware of this, and hangs
indefinitely.

If I execute a count priot to the lookup - I get the
correct number which suggests that the cluster is
operating as expected.

The exact same scenario works with a different type of
key (Tuple2): JavaPairRDDTuple2, Tuple2.

Any ideas on how to debug this problem ?

Thanks,

Yadid










CDH 5.0 and Spark 0.9.0

2014-04-30 Thread Paul Schooss
Hello,

So I was unable to run the following commands from the spark shell with CDH
5.0 and spark 0.9.0, see below.

Once I removed the property

property
nameio.compression.codec.lzo.class/name
valuecom.hadoop.compression.lzo.LzoCodec/value
finaltrue/final
/property

from the core-site.xml on the node, the spark commands worked. Is there a
specific setup I am missing?

scala var log = sc.textFile(hdfs://jobs-ab-hnn1//input/core-site.xml)
14/04/30 22:43:16 INFO MemoryStore: ensureFreeSpace(78800) called with
curMem=150115, maxMem=308713881
14/04/30 22:43:16 INFO MemoryStore: Block broadcast_1 stored as values to
memory (estimated size 77.0 KB, free 294.2 MB)
14/04/30 22:43:16 WARN Configuration: mapred-default.xml:an attempt to
override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
14/04/30 22:43:16 WARN Configuration: yarn-site.xml:an attempt to override
final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
14/04/30 22:43:16 WARN Configuration: hdfs-site.xml:an attempt to override
final parameter: mapreduce.map.output.compress.codec; Ignoring.
log: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at
console:12

scala log.count()
14/04/30 22:43:03 WARN JobConf: The variable mapred.child.ulimit is no
longer used.
14/04/30 22:43:04 WARN Configuration: mapred-default.xml:an attempt to
override final parameter: mapreduce.tasktracker.cache.local.size; Ignoring.
14/04/30 22:43:04 WARN Configuration: yarn-site.xml:an attempt to override
final parameter: mapreduce.output.fileoutputformat.compress.type; Ignoring.
14/04/30 22:43:04 WARN Configuration: hdfs-site.xml:an attempt to override
final parameter: mapreduce.map.output.compress.codec; Ignoring.
java.lang.IllegalArgumentException: java.net.UnknownHostException:
jobs-a-hnn1
at
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:377)
at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:237)
at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:141)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:576)
at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:521)
at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:146)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:221)
at
org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:140)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:902)
at org.apache.spark.rdd.RDD.count(RDD.scala:720)
at $iwC$$iwC$$iwC$$iwC.init(console:15)
at $iwC$$iwC$$iwC.init(console:20)
at $iwC$$iwC.init(console:22)
at $iwC.init(console:24)
at init(console:26)
at .init(console:30)
at .clinit(console)
at .init(console:7)
at .clinit(console)
at $print(console)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:795)
at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:840)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:752)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:600)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:607)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:610)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:935)
at

same partition id means same location?

2014-04-30 Thread wxhsdp
Hi,

  i'am just reviewing advanced spark features. it's about the pagerank
example.

  it said any shuffle operation on two RDDs will take on the partitioner of
one of them, if one is set.

  so first we partition the Links by hashPartitioner, then we join the Links
and Ranks0. Ranks0 will take 
  the hashPartitioner according to the document. the following reduceByKey
operation also respect the
  hashPartitioner, so when we join Links and Ranks1, there is no shuffle at
all.

  does that mean partitions of different RDDs with the same id will go
exactly to the same location even
  if the different RDDs locates at different nodes originally?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/same-partition-id-means-same-location-tp5136.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading multiple S3 objects, transforming, writing back one

2014-04-30 Thread Patrick Wendell
This is a consequence of the way the Hadoop files API works. However,
you can (fairly easily) add code to just rename the file because it
will always produce the same filename.

(heavy use of pseudo code)

dir = /some/dir
rdd.coalesce(1).saveAsTextFile(dir)
f = new File(dir + part-0)
f.moveTo(somewhere else)
dir.remove()

It might be cool to add a utility called `saveAsSingleFile` or
something that does this for you. In fact probably we should have
called saveAsTextfile saveAsTextFiles to make it more clear...

On Wed, Apr 30, 2014 at 2:00 PM, Peter thenephili...@yahoo.com wrote:
 Thanks Nicholas, this is a bit of a shame, not very practical for log roll
 up for example when every output needs to be in it's own directory.
 On Wednesday, April 30, 2014 12:15 PM, Nicholas Chammas
 nicholas.cham...@gmail.com wrote:
 Yes, saveAsTextFile() will give you 1 part per RDD partition. When you
 coalesce(1), you move everything in the RDD to a single partition, which
 then gives you 1 output file.
 It will still be called part-0 or something like that because that's
 defined by the Hadoop API that Spark uses for reading to/writing from S3. I
 don't know of a way to change that.


 On Wed, Apr 30, 2014 at 2:47 PM, Peter thenephili...@yahoo.com wrote:

 Ah, looks like RDD.coalesce(1) solves one part of the problem.
 On Wednesday, April 30, 2014 11:15 AM, Peter thenephili...@yahoo.com
 wrote:
 Hi

 Playing around with Spark  S3, I'm opening multiple objects (CSV files)
 with:

 val hfile = sc.textFile(s3n://bucket/2014-04-28/)

 so hfile is a RDD representing 10 objects that were underneath 2014-04-28.
 After I've sorted and otherwise transformed the content, I'm trying to write
 it back to a single object:


 sortedMap.values.map(_.mkString(,)).saveAsTextFile(s3n://bucket/concatted.csv)

 unfortunately this results in a folder named concatted.csv with 10 objects
 underneath, part-0 .. part-00010, corresponding to the 10 original
 objects loaded.

 How can I achieve the desired behaviour of putting a single object named
 concatted.csv ?

 I've tried 0.9.1 and 1.0.0-RC3.

 Thanks!
 Peter









How to handle this situation: Huge File Shared by All maps and Each Computer Has one copy?

2014-04-30 Thread PengWeiPRC
Hi there,

I was wondering if somebody could give me some suggestions about how to
handle this situation:

  I have a spark program, in which it reads a 6GB file first (Not RDD)
locally, and then do the map/reduce tasks. This 6GB file contains 
information that will be shared by all the map tasks. Previously, I handled
it using the broadcast function in Spark, which is like this:
global_file = fileRead(filename)
global_file.broadcast()
rdd.map(ele = MapFunc(ele))

  However, when running the spark program with a cluster of multiple
computers, I found that the remote nodes waited forever for the broadcasting
of the global_file. I think that it may not be a good solution to have each
map task to load the global file by themselves, which would incur huge
overhead.

  Actually, we have this global file in each node of our cluster. The ideal
behavior I hope is that for each node, they can read this global file only
from its local disk (and stay in memory), and then for all the map/reduce
tasks scheduled to this node, it can share that data. Hence, the global file
is neither like broadcasting variables, which is shared by all map/reduce
tasks, nor private variables only seen by one map task. It is shared
node-widely, which is read in each node only one time and shared by all the
tasks mapped to this node.

Could anybody tell me how to program in Spark to handle it? Thanks so much.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-handle-this-situation-Huge-File-Shared-by-All-maps-and-Each-Computer-Has-one-copy-tp5139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.