Adaptive behavior of Spark at different network transfer rates?

2015-07-13 Thread Niklas Wilcke
Hello,

I'm facing a strange behavior regarding a larger data processing
pipeline consisting of multiple steps involving Spark core and GraphX.
Increasing the network transfer rate in the 5 node cluster from 100
Mbit/s to 1 Gbit/s the runtime also increases from around 15 minutes to
19 Minutes. This only holds for large input files. On small files the
faster transfer rate decreases the runtime by around one third.

I tested the network transfer rate by transmitting files from node to
node. On 100 Mbit/s I get 11,7 MByte/s and on 1 Gbit/s I get 67 MByte/s.
For that reason the network itself should not be the reason.

My question is. Does Spark and especially GraphX adapt its behavior to
the available network transfer rate? Does anybody have an idea how a
faster network could decrease the performance?

Thank you very much!

Kind regards,
Niklas Wilcke



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Zipping RDDs of equal size not possible

2015-02-05 Thread Niklas Wilcke
Hi Xiangrui,

I'm sorry. I didn't recognize your mail.
What I did is a workaround only working for my special case.
It does not scale and only works for small data sets but that is fine
for me so far.

Kind Regards,
Niklas

  def securlyZipRdds[A, B: ClassTag](rdd1: RDD[A], rdd2: RDD[B]):
RDD[(A, B)] = {
val rdd1Repartitioned = rdd1.repartition(1)
val rdd2Repartitioned = rdd2.repartition(1)
val (rdd1Balanced, rdd2Balanced) =
balanceRddSizes(rdd1Repartitioned, rdd2Repartitioned)
rdd1Balanced.zip(rdd2Balanced)
  }

  def balanceRddSizes[A, B](rdd1: RDD[A], rdd2: RDD[B]): (RDD[A],
RDD[B]) = {
val rdd1count = rdd1.count()
val rdd2count = rdd2.count()
val difference = math.abs(rdd1count - rdd2count).toInt
if (rdd1count  rdd2count) {
  (removeRandomElements(rdd1, difference), rdd2)
} else if (rdd2count  rdd1count) {
  (rdd1, removeRandomElements(rdd2, difference))
} else {
  (rdd1, rdd2)
}
  }

  def removeRandomElements[A](rdd: RDD[A], numberOfElements: Int):
RDD[A] = {
val sample: Array[A] = rdd.takeSample(false, numberOfElements)
val set: Set[A] = Set(sample: _*)
rdd.filter(x = if (set.contains(x)) false else true)
  }

On 10.01.2015 06:56, Xiangrui Meng wrote:
 sample 2 * n tuples, split them into two parts, balance the sizes of
 these parts by filtering some tuples out
 
 How do you guarantee that the two RDDs have the same size?
 
 -Xiangrui
 
 On Fri, Jan 9, 2015 at 3:40 AM, Niklas Wilcke
 1wil...@informatik.uni-hamburg.de wrote:
 Hi Spark community,

 I have a problem with zipping two RDDs of the same size and same number of
 partitions.
 The error message says that zipping is only allowed on RDDs which are
 partitioned into chunks of exactly the same sizes.
 How can I assure this? My workaround at the moment is to repartition both
 RDDs to only one partition but that obviously
 does not scale.

 This problem originates from my problem to draw n random tuple pairs (Tuple,
 Tuple) from an RDD[Tuple].
 What I do is to sample 2 * n tuples, split them into two parts, balance the
 sizes of these parts
 by filtering some tuples out and zipping them together.

 I would appreciate to read better approaches for both problems.

 Thanks in advance,
 Niklas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Zipping RDDs of equal size not possible

2015-01-09 Thread Niklas Wilcke
Hi Spark community,

I have a problem with zipping two RDDs of the same size and same number
of partitions.
The error message says that zipping is only allowed on RDDs which are
partitioned into chunks of exactly the same sizes.
How can I assure this? My workaround at the moment is to repartition
both RDDs to only one partition but that obviously
does not scale.

This problem originates from my problem to draw n random tuple pairs
(Tuple, Tuple) from an RDD[Tuple].
What I do is to sample 2 * n tuples, split them into two parts, balance
the sizes of these parts
by filtering some tuples out and zipping them together.

I would appreciate to read better approaches for both problems.

Thanks in advance,
Niklas


Re: unable to make a custom class as a key in a pairrdd

2014-10-23 Thread Niklas Wilcke
Hi Jao,

I don't really know why this doesn't work but I have two hints.
You don't need to override hashCode and equals. The modifier case is
doing that for you. Writing

case class PersonID(id: String)

would be enough to get the class you want I think.
If I change the type of the id param to Int it works for me but I don't
know why.

case class PersonID(id: Int)

Looks like a strange behavior to me. Have a try.

Good luck,
Niklas

On 23.10.2014 21:52, Jaonary Rabarisoa wrote:
 Hi all,

 I have the following case class that I want to use as a key in a
 key-value rdd. I defined the equals and hashCode methode but it's not
 working. What I'm doing wrong ?

 /case class PersonID(id: String) {/
 / /
 / override def hashCode = id.hashCode/
 / /
 / override def equals(other: Any) = other match {/
 / /
 / case that: PersonID = this.id http://this.id == that.id
 http://that.id  this.getClass == that.getClass/
 / case _ = false/
 / }   /
 / }   /
 / /
 / /
 / val p = sc.parallelize((1 until 10).map(x = (PersonID(1),x )))/
 /
 /
 /
 /
 /p.groupByKey.collect foreach println/
 /
 /
 /(PersonID(1),CompactBuffer(5))/
 /(PersonID(1),CompactBuffer(6))/
 /(PersonID(1),CompactBuffer(7))/
 /(PersonID(1),CompactBuffer(8, 9))/
 /(PersonID(1),CompactBuffer(1))/
 /(PersonID(1),CompactBuffer(2))/
 /(PersonID(1),CompactBuffer(3))/
 /(PersonID(1),CompactBuffer(4))/
 /
 /
 /
 /
 Best,

 Jao