How to monitor the throughput and latency of the combineByKey transformation in Spark 3?

2020-07-20 Thread Felipe Gutierrez
Hi community,

I built a simple count and sum spark application which uses the
combineByKey transformation [1] and I would like to monitor the
throughput in/out of this transformation and the latency that the
combineByKey spends to pre-aggregate tuples. Ideally, the latency I
would like to take the average of the last 30 seconds using a
histogram and the 99th percentile.

I was imagining to add a dropwizard metrics [2] on the combiner
function that I pass to the combineByKey. But It is confused because
there are 2 more functions that I must pass to the combineByKey.

How would you suggest me to implement this monitoring strategy?

Thanks,
Felipe
[1] 
https://github.com/felipegutierrez/explore-spark/blob/master/src/main/scala/org/sense/spark/app/combiners/TaxiRideCountCombineByKey.scala#L40
[2] https://metrics.dropwizard.io/4.1.2/getting-started.html

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: combineByKey

2019-04-05 Thread Jason Nerothin
Take a look at this SOF:

https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work

On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Thank you for the details. It is a typo error while composing the mail.
> Below is the actual flow.
>
> Any idea, why the combineByKey is not working. aggregateByKey is working.
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
>
> def createCombiner = (Id: String, value: String) => (value :: Nil).toSet
>
> def mergeValue = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
>  sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *aggregateByKey =>*
>
> val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
> (x.Id, x.value))).aggregateByKey(Set[String]())(
> (aggr, value) => aggr ++ Set(value._2),
> (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap
>
>  print(result)
>
> Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
> Set(t1, t2))
>
> Regards,
> Rajesh
>
> On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin 
> wrote:
>
>> I broke some of your code down into the following lines:
>>
>> import spark.implicits._
>>
>> val a: RDD[Messages]= sc.parallelize(messages)
>> val b: Dataset[Messages] = a.toDF.as[Messages]
>> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp
>> + "-" + x.Id, (x.Id, x.value))}
>>
>> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
>> have the types you think for the reduceByKey.
>>
>> I recommend breaking the code down like this to statement-by-statement
>> when you get into a dance with the Scala type system.
>>
>> The type-safety that you're after (that eventually makes life *easier*)
>> is best supported by Dataset (would have prevented the .id vs .Id error).
>> Although there are some performance tradeoffs vs RDD and DataFrame...
>>
>>
>>
>>
>>
>>
>> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
>> mrajaf...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Any issue in the below code.
>>>
>>> case class Messages(timeStamp: Int, Id: String, value: String)
>>>
>>> val messages = Array(
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t1"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t2"),
>>>   Messages(0, "d1", "t3"),
>>>   Messages(0, "d1", "t4"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t1"),
>>>   Messages(0, "d2", "t5"),
>>>   Messages(0, "d2", "t6"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d2", "t2"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t1"),
>>>   Messages(0, "d3", "t2")
>>> )
>>>
>>> //Defining createCombiner, mergeValue and mergeCombiner functions
>>> def createCombiner = (id: String, value: String) => Set(value)
>>>
>>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>>> String)) => accumulator1 ++ Set(accumulator2._2)
>>>
>>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>>> accumulator2
>>>
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> *Compile Error:-*
>>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>>  required: ((String, String)) => ?
>>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>>
>>> Regards,
>>> Rajesh
>>>
>>>
>>
>> --
>> Thanks,
>> Jason
>>
>

-- 
Thanks,
Jason


Re: combineByKey

2019-04-05 Thread Madabhattula Rajesh Kumar
Hi,

Thank you for the details. It is a typo error while composing the mail.
Below is the actual flow.

Any idea, why the combineByKey is not working. aggregateByKey is working.

//Defining createCombiner, mergeValue and mergeCombiner functions

def createCombiner = (Id: String, value: String) => (value :: Nil).toSet

def mergeValue = (accumulator1: Set[String], accumulator2: (String,
String)) => accumulator1 ++ Set(accumulator2._2)

def mergeCombiner: (Set[String], Set[String]) => Set[String] =
(accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
accumulator2

sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*Compile Error:-*
 found   : (String, String) => scala.collection.immutable.Set[String]
 required: ((String, String)) => ?
 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*aggregateByKey =>*

val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id,
(x.Id, x.value))).aggregateByKey(Set[String]())(
(aggr, value) => aggr ++ Set(value._2),
(aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap

 print(result)

Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 ->
Set(t1, t2))

Regards,
Rajesh

On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin 
wrote:

> I broke some of your code down into the following lines:
>
> import spark.implicits._
>
> val a: RDD[Messages]= sc.parallelize(messages)
> val b: Dataset[Messages] = a.toDF.as[Messages]
> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
> "-" + x.Id, (x.Id, x.value))}
>
> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't
> have the types you think for the reduceByKey.
>
> I recommend breaking the code down like this to statement-by-statement
> when you get into a dance with the Scala type system.
>
> The type-safety that you're after (that eventually makes life *easier*) is
> best supported by Dataset (would have prevented the .id vs .Id error).
> Although there are some performance tradeoffs vs RDD and DataFrame...
>
>
>
>
>
>
> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
> mrajaf...@gmail.com> wrote:
>
>> Hi,
>>
>> Any issue in the below code.
>>
>> case class Messages(timeStamp: Int, Id: String, value: String)
>>
>> val messages = Array(
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t1"),
>>   Messages(0, "d1", "t2"),
>>   Messages(0, "d1", "t2"),
>>   Messages(0, "d1", "t3"),
>>   Messages(0, "d1", "t4"),
>>   Messages(0, "d2", "t1"),
>>   Messages(0, "d2", "t1"),
>>   Messages(0, "d2", "t5"),
>>   Messages(0, "d2", "t6"),
>>   Messages(0, "d2", "t2"),
>>   Messages(0, "d2", "t2"),
>>   Messages(0, "d3", "t1"),
>>   Messages(0, "d3", "t1"),
>>   Messages(0, "d3", "t2")
>> )
>>
>> //Defining createCombiner, mergeValue and mergeCombiner functions
>> def createCombiner = (id: String, value: String) => Set(value)
>>
>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
>> String)) => accumulator1 ++ Set(accumulator2._2)
>>
>> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
>> accumulator2
>>
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> *Compile Error:-*
>>  found   : (String, String) => scala.collection.immutable.Set[String]
>>  required: ((String, String)) => ?
>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>>
>> Regards,
>> Rajesh
>>
>>
>
> --
> Thanks,
> Jason
>


Re: combineByKey

2019-04-05 Thread Jason Nerothin
I broke some of your code down into the following lines:

import spark.implicits._

val a: RDD[Messages]= sc.parallelize(messages)
val b: Dataset[Messages] = a.toDF.as[Messages]
val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp +
"-" + x.Id, (x.Id, x.value))}

You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have
the types you think for the reduceByKey.

I recommend breaking the code down like this to statement-by-statement when
you get into a dance with the Scala type system.

The type-safety that you're after (that eventually makes life *easier*) is
best supported by Dataset (would have prevented the .id vs .Id error).
Although there are some performance tradeoffs vs RDD and DataFrame...






On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar <
mrajaf...@gmail.com> wrote:

> Hi,
>
> Any issue in the below code.
>
> case class Messages(timeStamp: Int, Id: String, value: String)
>
> val messages = Array(
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t1"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t2"),
>   Messages(0, "d1", "t3"),
>   Messages(0, "d1", "t4"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t1"),
>   Messages(0, "d2", "t5"),
>   Messages(0, "d2", "t6"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d2", "t2"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t1"),
>   Messages(0, "d3", "t2")
> )
>
> //Defining createCombiner, mergeValue and mergeCombiner functions
> def createCombiner = (id: String, value: String) => Set(value)
>
> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
> String)) => accumulator1 ++ Set(accumulator2._2)
>
> def mergeCombiner: (Set[String], Set[String]) => Set[String] =
> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
> accumulator2
>
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> *Compile Error:-*
>  found   : (String, String) => scala.collection.immutable.Set[String]
>  required: ((String, String)) => ?
> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)
>
> Regards,
> Rajesh
>
>

-- 
Thanks,
Jason


combineByKey

2019-04-05 Thread Madabhattula Rajesh Kumar
Hi,

Any issue in the below code.

case class Messages(timeStamp: Int, Id: String, value: String)

val messages = Array(
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t1"),
  Messages(0, "d1", "t2"),
  Messages(0, "d1", "t2"),
  Messages(0, "d1", "t3"),
  Messages(0, "d1", "t4"),
  Messages(0, "d2", "t1"),
  Messages(0, "d2", "t1"),
  Messages(0, "d2", "t5"),
  Messages(0, "d2", "t6"),
  Messages(0, "d2", "t2"),
  Messages(0, "d2", "t2"),
  Messages(0, "d3", "t1"),
  Messages(0, "d3", "t1"),
  Messages(0, "d3", "t2")
)

//Defining createCombiner, mergeValue and mergeCombiner functions
def createCombiner = (id: String, value: String) => Set(value)

def mergeValue0 = (accumulator1: Set[String], accumulator2: (String,
String)) => accumulator1 ++ Set(accumulator2._2)

def mergeCombiner: (Set[String], Set[String]) => Set[String] =
(accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++
accumulator2

sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

*Compile Error:-*
 found   : (String, String) => scala.collection.immutable.Set[String]
 required: ((String, String)) => ?
sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id,
x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner)

Regards,
Rajesh


Re: Inconsistent results with combineByKey API

2017-09-05 Thread Swapnil Shinde
Ping.. Can someone please correct me whether this is an issue or not.

-
Swapnil

On Thu, Aug 31, 2017 at 12:27 PM, Swapnil Shinde 
wrote:

> Hello All
>
> I am observing some strange results with aggregateByKey API which is
> implemented with combineByKey. Not sure if this is by design or bug -
>
> I created this toy example but same problem can be observed on large
> datasets as well -
>
> *case class ABC(key: Int, c1: Int, c2: Int)*
> *case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)*
>
> // Create RDD and making sure if has 1 or 2 partitions for this example.
> // With 2 partitions there are high chances that same key could be in same
> partition.
> *val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20,
> 40), ABC(2, 20, 40))).coalece(2)*
>
> Now, I am running aggregateByKey where I am grouping by Key to sum c1 and
> c2 but return ABCoutput with new 'desc' property.
>
> *val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0))
> ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate",
> x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) =>
> ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*
>
> Above query may return results like this -
> [image: Inline image 1]
>
> It means for one of the keys which has all values in same partition didn't
> invoke mergeCombiner function which returns ABCoutput with desc=final.
>
> I am expecting mergeCombiner function to be invoked all the time which is
> not happening. Correct me if wrong, but is this expected behavior?
>
> Further debugging shows that it works fine if I create input RDD with more
> partitions( which increases chances of having rows with same key in
> different partitions)
>
> *val b = a.repartition(20).keyBy(x =>
> x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC)
> => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1:
> ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum,
> m1.c2Sum+m2.c2Sum))*
> [image: Inline image 2]
>
> One more thing to mention - If I make sure my input RDD is partitioned
> then it simply runs aggregation with mapPartitions (here
> <https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>).
> Now, this makes sense in terms of aggregations as all values for given key
> are in same partition. However, I have something in my mergeCombiner
> function that I would like to run which wont get invoked.
> Traditional map reduce allows to have different combiner and reduce
> function and it is guaranteed that reduce is always invoked. I can see that
> running aggregations with no shuffle has performance gains but API seems to
> be confusing/misleading. User might hope that mergeCombiner gets invoked
> but in reality it isn't. It will be great if this API designers can shed
> some light on this.
>
> *import org.apache.spark.HashPartitioner*
> *val b = a.keyBy(x => x.key).partitionBy(new
> HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1:
> ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1,
> x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key,
> "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*
>
> [image: Inline image 3]
>
> Above examples shows this behavior with AggregateByKey but same thing can
> be observed with CombineByKey as well.
> *val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key,
> "initial", x.c1, x.c2), *
> *  (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum
> + x2.c1, x1.c2Sum+x2.c2),*
> *  (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum +
> x2.c1Sum, x1.c2Sum+x2.c2Sum))*
>
> *[image: Inline image 4]*
>
>
> Please let me know if you need any further information and correct me if
> my understanding of API is wrong.
>
> Thanks
> Swapnil
>


Inconsistent results with combineByKey API

2017-08-31 Thread Swapnil Shinde
Hello All

I am observing some strange results with aggregateByKey API which is
implemented with combineByKey. Not sure if this is by design or bug -

I created this toy example but same problem can be observed on large
datasets as well -

*case class ABC(key: Int, c1: Int, c2: Int)*
*case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)*

// Create RDD and making sure if has 1 or 2 partitions for this example.
// With 2 partitions there are high chances that same key could be in same
partition.
*val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20,
40), ABC(2, 20, 40))).coalece(2)*

Now, I am running aggregateByKey where I am grouping by Key to sum c1 and
c2 but return ABCoutput with new 'desc' property.

*val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0))
((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate",
x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) =>
ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

Above query may return results like this -
[image: Inline image 1]

It means for one of the keys which has all values in same partition didn't
invoke mergeCombiner function which returns ABCoutput with desc=final.

I am expecting mergeCombiner function to be invoked all the time which is
not happening. Correct me if wrong, but is this expected behavior?

Further debugging shows that it works fine if I create input RDD with more
partitions( which increases chances of having rows with same key in
different partitions)

*val b = a.repartition(20).keyBy(x =>
x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC)
=> ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1:
ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum,
m1.c2Sum+m2.c2Sum))*
[image: Inline image 2]

One more thing to mention - If I make sure my input RDD is partitioned then
it simply runs aggregation with mapPartitions (here
<https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>).
Now, this makes sense in terms of aggregations as all values for given key
are in same partition. However, I have something in my mergeCombiner
function that I would like to run which wont get invoked.
Traditional map reduce allows to have different combiner and reduce
function and it is guaranteed that reduce is always invoked. I can see that
running aggregations with no shuffle has performance gains but API seems to
be confusing/misleading. User might hope that mergeCombiner gets invoked
but in reality it isn't. It will be great if this API designers can shed
some light on this.

*import org.apache.spark.HashPartitioner*
*val b = a.keyBy(x => x.key).partitionBy(new
HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1:
ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1,
x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key,
"final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))*

[image: Inline image 3]

Above examples shows this behavior with AggregateByKey but same thing can
be observed with CombineByKey as well.
*val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key,
"initial", x.c1, x.c2), *
*  (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum +
x2.c1, x1.c2Sum+x2.c2),*
*  (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum +
x2.c1Sum, x1.c2Sum+x2.c2Sum))*

*[image: Inline image 4]*


Please let me know if you need any further information and correct me if my
understanding of API is wrong.

Thanks
Swapnil


Partitioning Data to optimize combineByKey

2016-06-02 Thread Nathan Case
Hello,

I am trying to process a dataset that is approximately 2 tb using a cluster
with 4.5 tb of ram.  The data is in parquet format and is initially loaded
into a dataframe.  A subset of the data is then queried for and converted
to RDD for more complicated processing.  The first stage of that processing
is to mapToPair to use each rows id as the key in a tuple.  Then the data
goes through a combineByKey operation to group all values with the same
key.  This operation always exceeds the maximum cluster memory and the job
eventually fails.  While it is shuffling there is a lot of "spilling
in-memory map to disk" messages.  I am wondering if I were to have the data
initially partitioned such that all the rows with the same id resided
within the same partition if it would need to do left shuffling and perform
correctly.

To do the initial load I am using:

sqlContext.read().parquet(inputPathArray).repartition(1, new
Column("id"));

I am not sure if this is the correct way to partition a dataframe so that
is my first question is the above correct.

My next question is that when I go from the dataframe to rdd using:

JavaRDD locationsForSpecificKey = sqlc.sql("SELECT * FROM
standardlocationrecords WHERE customerID = " + customerID + " AND
partnerAppID = " + partnerAppID)
.toJavaRDD().map(new LocationRecordFromRow()::apply);

is the partition scheme from the dataframe preserved or do I need to
repartition after doing a mapToPair using:

rdd.partitionBy and passing in a custom HashPartitioner that uses the hash
of the ID field.

My goal is to reduce the shuffling when doing the final combineByKey to
prevent the job from running out of memory and failing.  Any help would be
greatly appreciated.

Thanks,
Nathan


Re: Datasets combineByKey

2016-04-10 Thread Koert Kuipers
yes it is
On Apr 10, 2016 3:17 PM, "Amit Sela"  wrote:

> I think *org.apache.spark.sql.expressions.Aggregator* is what I'm looking
> for, makes sense ?
>
> On Sun, Apr 10, 2016 at 4:08 PM Amit Sela  wrote:
>
>> I'm mapping RDD API to Datasets API and I was wondering if I was missing
>> something or is this functionality is missing.
>>
>>
>> On Sun, Apr 10, 2016 at 3:00 PM Ted Yu  wrote:
>>
>>> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>>>
>>> What's your use case ?
>>>
>>> Thanks
>>>
>>> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:
>>>
>>>> Is there (planned ?) a combineByKey support for Dataset ?
>>>> Is / Will there be a support for combiner lifting ?
>>>>
>>>> Thanks,
>>>> Amit
>>>>
>>>
>>>


Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
I think *org.apache.spark.sql.expressions.Aggregator* is what I'm looking
for, makes sense ?

On Sun, Apr 10, 2016 at 4:08 PM Amit Sela  wrote:

> I'm mapping RDD API to Datasets API and I was wondering if I was missing
> something or is this functionality is missing.
>
>
> On Sun, Apr 10, 2016 at 3:00 PM Ted Yu  wrote:
>
>> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>>
>> What's your use case ?
>>
>> Thanks
>>
>> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:
>>
>>> Is there (planned ?) a combineByKey support for Dataset ?
>>> Is / Will there be a support for combiner lifting ?
>>>
>>> Thanks,
>>> Amit
>>>
>>
>>


Re: Datasets combineByKey

2016-04-10 Thread Amit Sela
I'm mapping RDD API to Datasets API and I was wondering if I was missing
something or is this functionality is missing.

On Sun, Apr 10, 2016 at 3:00 PM Ted Yu  wrote:

> Haven't found any JIRA w.r.t. combineByKey for Dataset.
>
> What's your use case ?
>
> Thanks
>
> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:
>
>> Is there (planned ?) a combineByKey support for Dataset ?
>> Is / Will there be a support for combiner lifting ?
>>
>> Thanks,
>> Amit
>>
>
>


Re: Datasets combineByKey

2016-04-10 Thread Ted Yu
Haven't found any JIRA w.r.t. combineByKey for Dataset.

What's your use case ?

Thanks

On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela  wrote:

> Is there (planned ?) a combineByKey support for Dataset ?
> Is / Will there be a support for combiner lifting ?
>
> Thanks,
> Amit
>


Datasets combineByKey

2016-04-09 Thread Amit Sela
Is there (planned ?) a combineByKey support for Dataset ?
Is / Will there be a support for combiner lifting ?

Thanks,
Amit


RE: aggregateByKey vs combineByKey

2016-01-05 Thread LINChen
Hi Marco,In your case, since you don't need to perform an aggregation (such as 
a sum or average) over each key, using groupByKey may perform better. 
groupByKey inherently utilizes compactBuffer which is much more efficient than 
ArrayBuffer.
Thanks.LIN Chen

Date: Tue, 5 Jan 2016 21:13:40 +
Subject: aggregateByKey vs combineByKey
From: mmistr...@gmail.com
To: user@spark.apache.org

Hi all
 i have the following dataSet
 kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s
tring)]

It's a simple list of tuples containing (word_length, word)

What i wanted to do was to group the result by key in order to have a result in 
the form

[(word_length_1, [word1, word2, word3], word_length_2, [word4, word5, word6])

so i browsed spark API and was able to get the result i wanted using two 
different
functions
.
scala> kv.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[St
ring], y:List[String]) => x ::: y).collect()
res86: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)), (4,L
ist(test)), (6,List(string)))

and

scala>
scala> kv.aggregateByKey(List[String]())((acc, item) => item :: acc,
 |(acc1, acc2) => acc1 ::: acc2).collect()
 
 
 
res87: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)), (4,L
ist(test)), (6,List(string)))

Now, question is: any advantages of using one instead of the others?
Am i somehow misusing the API for what i want to do?

kind regards
 marco







  

Re: aggregateByKey vs combineByKey

2016-01-05 Thread Ted Yu
Looking at PairRDDFunctions.scala :

  def aggregateByKey[U: ClassTag](zeroValue: U, partitioner:
Partitioner)(seqOp: (U, V) => U,
  combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
...
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
  cleanedSeqOp, combOp, partitioner)

I think the two operations should be have similar performance.

Cheers

On Tue, Jan 5, 2016 at 1:13 PM, Marco Mistroni  wrote:

> Hi all
>  i have the following dataSet
> kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)]
>
> It's a simple list of tuples containing (word_length, word)
>
> What i wanted to do was to group the result by key in order to have a
> result in the form
>
> [(word_length_1, [word1, word2, word3], word_length_2, [word4, word5,
> word6])
>
> so i browsed spark API and was able to get the result i wanted using two
> different
> functions
> .
>
> scala> kv.combineByKey(List(_), (x:List[String], y:String) => y :: x,
> (x:List[St
>
> ring], y:List[String]) => x ::: y).collect()
>
> res86: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi,
> am)), (4,L
> ist(test)), (6,List(string)))
>
> and
>
> scala>
>
> scala> kv.aggregateByKey(List[String]())((acc, item) => item :: acc,
>
>  |(acc1, acc2) => acc1 ::: acc2).collect()
>
>
>
>
>
>
>
> res87: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi,
> am)), (4,L
> ist(test)), (6,List(string)))
>
> Now, question is: any advantages of using one instead of the others?
> Am i somehow misusing the API for what i want to do?
>
> kind regards
>  marco
>
>
>
>
>
>
>
>


aggregateByKey vs combineByKey

2016-01-05 Thread Marco Mistroni
Hi all
 i have the following dataSet
kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)]

It's a simple list of tuples containing (word_length, word)

What i wanted to do was to group the result by key in order to have a
result in the form

[(word_length_1, [word1, word2, word3], word_length_2, [word4, word5,
word6])

so i browsed spark API and was able to get the result i wanted using two
different
functions
.

scala> kv.combineByKey(List(_), (x:List[String], y:String) => y :: x,
(x:List[St

ring], y:List[String]) => x ::: y).collect()

res86: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)),
(4,L
ist(test)), (6,List(string)))

and

scala>

scala> kv.aggregateByKey(List[String]())((acc, item) => item :: acc,

 |(acc1, acc2) => acc1 ::: acc2).collect()







res87: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)),
(4,L
ist(test)), (6,List(string)))

Now, question is: any advantages of using one instead of the others?
Am i somehow misusing the API for what i want to do?

kind regards
 marco


Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am setting spark.executor.memory as 1024m on a 3 node cluster with each
node having 4 cores and 7 GB RAM. The combiner functions are taking scala
case classes as input and are generating mutable.ListBuffer of scala case
classes. Therefore, I am guessing hashCode and equals should be taken care
of.

Thanks,
Aniket

On Wed, Apr 15, 2015 at 1:00 PM Xianjin YE  wrote:

> what is your JVM heap size settings?  The OOM in SIzeEstimator is caused
> by a lot of entry in IdentifyHashMap.
> A quick guess is that the object in your dataset is a custom class and you
> didn't implement the hashCode and equals method correctly.
>
>
>
> On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote:
>
> > I am aggregating a dataset using combineByKey method and for a certain
> input size, the job fails with the following error. I have enabled head
> dumps to better analyze the issue and will report back if I have any
> findings. Meanwhile, if you guys have any idea of what could possibly
> result in this error or how to better debug this, please let me know.
> >
> > java.lang.OutOfMemoryError: Java heap space
> > at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
> > at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
> > at
> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
> > at
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
> > at
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
> > at scala.collection.immutable.List.foreach(List.scala:381)
> > at
> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
> > at
> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
> > at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
> > at
> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
> > at
> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
> > at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
> > at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
> > at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
> > at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
> > at
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
> > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>
>
>
>


Re: OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Xianjin YE
what is your JVM heap size settings?  The OOM in SIzeEstimator is caused by a 
lot of entry in IdentifyHashMap. 
A quick guess is that the object in your dataset is a custom class and you 
didn't implement the hashCode and equals method correctly. 



On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote:

> I am aggregating a dataset using combineByKey method and for a certain input 
> size, the job fails with the following error. I have enabled head dumps to 
> better analyze the issue and will report back if I have any findings. 
> Meanwhile, if you guys have any idea of what could possibly result in this 
> error or how to better debug this, please let me know.
> 
> java.lang.OutOfMemoryError: Java heap space
> at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
> at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
> at 
> org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
> at 
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
> at 
> org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at 
> org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
> at 
> org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
> at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
> at 
> org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
> at 
> org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
> at 
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
> at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
> at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
> at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
> at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



OOM in SizeEstimator while using combineByKey

2015-04-15 Thread Aniket Bhatnagar
I am aggregating a dataset using combineByKey method and for a certain
input size, the job fails with the following error. I have enabled head
dumps to better analyze the issue and will report back if I have any
findings. Meanwhile, if you guys have any idea of what could possibly
result in this error or how to better debug this, please let me know.

java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:469)
at java.util.IdentityHashMap.put(IdentityHashMap.java:445)
at
org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178)
at
org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177)
at
org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155)
at
org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at
org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105)
at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93)
at
org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)


CombineByKey - Please explain its working

2015-03-24 Thread ashish.usoni
I am reading about combinebyKey and going through below example from one of
the blog post but i cant understand how it works step by step , Can some one
please explain 


Case  class  Fruit ( kind :  String ,  weight :  Int )  { 
def  makeJuice : Juice  =  Juice ( weight  *  100 ) 
} 
Case  class  Juice ( volumn :  Int )  { 
def  add ( J :  Juice ) : Juice  =  Juice ( volumn  +  J . volumn ) 
} 
Val  apple1  =  Fruit ( "Apple" ,  5 ) 
Val  Apple2  =  Fruit ( "Apple" ,  8 ) 
Val  orange1  =  Fruit ( "orange" ,  10 )

Val  Fruit  =  sc . Parallelize ( List (( "Apple" ,  apple1 )  ,  ( "orange"
,  orange1 )  ,  ( "Apple" ,  Apple2 )))  
*Val  Juice  =  Fruit . combineByKey ( 
f  =>  f . makeJuice , 
( J : Juice , f )  =>  J . add ( f . makeJuice ), 
( J1 : Juice , J2 : Juice )  =>  J1 . add ( J2 )  
)*



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/CombineByKey-Please-explain-its-working-tp22203.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: retry in combineByKey at BinaryClassificationMetrics.scala

2014-12-23 Thread Sean Owen
Yes, my change is slightly downstream of this point in the processing
though. The code is still creating a counter for each distinct score
value, and then binning. I don't think that would cause a failure -
just might be slow. At the extremes, you might see 'fetch failure' as
a symptom of things running too slowly.

Yes you can sacrifice some fidelity by more aggressively binning
upstream, on your scores. That would drastically reduce the input
size, at the cost of accuracy of course.

On Tue, Dec 23, 2014 at 7:35 PM, Xiangrui Meng  wrote:
> Sean's PR may be relevant to this issue
> (https://github.com/apache/spark/pull/3702). As a workaround, you can
> try to truncate the raw scores to 4 digits (e.g., 0.5643215 -> 0.5643)
> before sending it to BinaryClassificationMetrics. This may not work
> well if he score distribution is very skewed. See discussion on
> https://issues.apache.org/jira/browse/SPARK-4547 -Xiangrui
>
> On Tue, Dec 23, 2014 at 9:00 AM, Thomas Kwan  wrote:
>> Hi there,
>>
>> We are using mllib 1.1.1, and doing Logistics Regression with a dataset of
>> about 150M rows.
>> The training part usually goes pretty smoothly without any retries. But
>> during the prediction stage and BinaryClassificationMetrics stage, I am
>> seeing retries with error of "fetch failure".
>>
>> The prediction part is just as follows:
>>
>> val predictionAndLabel = testRDD.map { point =>
>> val prediction = model.predict(point.features)
>> (prediction, point.label)
>> }
>> ...
>> val metrics = new BinaryClassificationMetrics(predictionAndLabel)
>>
>> The fetch failure happened with the following stack trace:
>>
>> org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:515)
>>
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3$lzycompute(BinaryClassificationMetrics.scala:101)
>>
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3(BinaryClassificationMetrics.scala:96)
>>
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:98)
>>
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:98)
>>
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:142)
>>
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:50)
>>
>> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:60)
>>
>> com.manage.ml.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:14)
>>
>> ...
>>
>>
>> We are doing this in the yarn-client mode. 32 executors, 16G executor
>> memory, and 12 cores as the spark-submit settings.
>>
>> I wonder if anyone has suggestion on how to debug this.
>>
>> thanks in advance
>> thomas
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: retry in combineByKey at BinaryClassificationMetrics.scala

2014-12-23 Thread Xiangrui Meng
Sean's PR may be relevant to this issue
(https://github.com/apache/spark/pull/3702). As a workaround, you can
try to truncate the raw scores to 4 digits (e.g., 0.5643215 -> 0.5643)
before sending it to BinaryClassificationMetrics. This may not work
well if he score distribution is very skewed. See discussion on
https://issues.apache.org/jira/browse/SPARK-4547 -Xiangrui

On Tue, Dec 23, 2014 at 9:00 AM, Thomas Kwan  wrote:
> Hi there,
>
> We are using mllib 1.1.1, and doing Logistics Regression with a dataset of
> about 150M rows.
> The training part usually goes pretty smoothly without any retries. But
> during the prediction stage and BinaryClassificationMetrics stage, I am
> seeing retries with error of "fetch failure".
>
> The prediction part is just as follows:
>
> val predictionAndLabel = testRDD.map { point =>
> val prediction = model.predict(point.features)
> (prediction, point.label)
> }
> ...
> val metrics = new BinaryClassificationMetrics(predictionAndLabel)
>
> The fetch failure happened with the following stack trace:
>
> org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:515)
>
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3$lzycompute(BinaryClassificationMetrics.scala:101)
>
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3(BinaryClassificationMetrics.scala:96)
>
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:98)
>
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:98)
>
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:142)
>
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:50)
>
> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:60)
>
> com.manage.ml.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:14)
>
> ...
>
>
> We are doing this in the yarn-client mode. 32 executors, 16G executor
> memory, and 12 cores as the spark-submit settings.
>
> I wonder if anyone has suggestion on how to debug this.
>
> thanks in advance
> thomas

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



retry in combineByKey at BinaryClassificationMetrics.scala

2014-12-23 Thread Thomas Kwan
Hi there,

We are using mllib 1.1.1, and doing Logistics Regression with a dataset of
about 150M rows.
The training part usually goes pretty smoothly without any retries. But
during the prediction stage and BinaryClassificationMetrics stage, I am
seeing retries with error of "fetch failure".

The prediction part is just as follows:

val predictionAndLabel = testRDD.map { point =>
val prediction = model.predict(point.features)
(prediction, point.label)
}
...
val metrics = new BinaryClassificationMetrics(predictionAndLabel)

The fetch failure happened with the following stack trace:

org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:515)

org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3$lzycompute(BinaryClassificationMetrics.scala:101)

org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3(BinaryClassificationMetrics.scala:96)

org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:98)

org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:98)

org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:142)

org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:50)

org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:60)

com.manage.ml.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:14)

...

We are doing this in the yarn-client mode. 32 executors, 16G executor
memory, and 12 cores as the spark-submit settings.

I wonder if anyone has suggestion on how to debug this.

thanks in advance
thomas


Re: Help with using combineByKey

2014-10-10 Thread HARIPRIYA AYYALASOMAYAJULA
Thank you guys!

It was very helpful and now I understand it better.




On Fri, Oct 10, 2014 at 1:38 AM, Davies Liu  wrote:

> Maybe this version is easier to use:
>
> plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) =>
> (x._1 + y._1, x._2 + y._2))
>
> It has similar behavior with combineByKey(), will by faster than
> groupByKey() version.
>
> On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA
>  wrote:
> > Sean,
> >
> > Thank you. It works. But I am still confused about the function. Can you
> > kindly throw some light on it?
> > I was going through the example mentioned in
> >
> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
> >
> > Is there any better source through which I can learn more about these
> > functions? It would be helpful if I can get a chance to look at more
> > examples.
> > Also, I assume using combineByKey helps us solve it parallel than using
> > simple functions provided by scala as mentioned by Yana. Am I correct?
> >
> > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen  wrote:
> >>
> >> Oh duh, sorry. The initialization should of course be (v) => (if (v >
> >> 0) 1 else 0, 1)
> >> This gives the answer you are looking for. I don't see what Part2 is
> >> supposed to do differently.
> >>
> >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
> >>  wrote:
> >> > Hello Sean,
> >> >
> >> > Thank you, but changing from v to 1 doesn't help me either.
> >> >
> >> > I am trying to count the number of non-zero values using the first
> >> > accumulator.
> >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0),
> >> > ("SFO",0),
> >> > ("SFO",9))
> >> >
> >> > val plist = sc.parallelize(newlist)
> >> >
> >> >  val part1 = plist.combineByKey(
> >> >(v) => (1, 1),
> >> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2
> +
> >> > 1),
> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1,
> acc1._2 +
> >> > acc2._2)
> >> >)
> >> >
> >> >val Part2 = part1.map{ case (key, value) => (key,
> >> > (value._1,value._2)) }
> >> >
> >> > This should give me the result
> >> > (LAX,(2,3))
> >> > (SFO,(1,3))
> >> >
> >> >
> >> >
> >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen 
> wrote:
> >> >>
> >> >> You have a typo in your code at "var acc:", and the map from opPart1
> >> >> to opPart2 looks like a no-op, but those aren't the problem I think.
> >> >> It sounds like you intend the first element of each pair to be a
> count
> >> >> of nonzero values, but you initialize the first element of the pair
> to
> >> >> v, not 1, in v => (v,1). Try v => (1,1)
> >> >>
> >> >>
> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
> >> >>  wrote:
> >> >> >
> >> >> > I am a beginner to Spark and finding it difficult to implement a
> very
> >> >> > simple
> >> >> > reduce operation. I read that is ideal to use combineByKey for
> >> >> > complex
> >> >> > reduce operations.
> >> >> >
> >> >> > My input:
> >> >> >
> >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
> >> >> > ("SFO",0),
> >> >> > ("SFO",1),
> >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
> >> >> > ("KX",9),
> >> >> >
> >> >> >
> >> >> >
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
> >> >> >
> >> >> >
> >> >> >  val opPart1 = input.combineByKey(
> >> >> >(v) => (v, 1),
> >> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 

Re: Help with using combineByKey

2014-10-09 Thread Davies Liu
Maybe this version is easier to use:

plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) =>
(x._1 + y._1, x._2 + y._2))

It has similar behavior with combineByKey(), will by faster than
groupByKey() version.

On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA
 wrote:
> Sean,
>
> Thank you. It works. But I am still confused about the function. Can you
> kindly throw some light on it?
> I was going through the example mentioned in
> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>
> Is there any better source through which I can learn more about these
> functions? It would be helpful if I can get a chance to look at more
> examples.
> Also, I assume using combineByKey helps us solve it parallel than using
> simple functions provided by scala as mentioned by Yana. Am I correct?
>
> On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen  wrote:
>>
>> Oh duh, sorry. The initialization should of course be (v) => (if (v >
>> 0) 1 else 0, 1)
>> This gives the answer you are looking for. I don't see what Part2 is
>> supposed to do differently.
>>
>> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
>>  wrote:
>> > Hello Sean,
>> >
>> > Thank you, but changing from v to 1 doesn't help me either.
>> >
>> > I am trying to count the number of non-zero values using the first
>> > accumulator.
>> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0),
>> > ("SFO",0),
>> > ("SFO",9))
>> >
>> > val plist = sc.parallelize(newlist)
>> >
>> >  val part1 = plist.combineByKey(
>> >(v) => (1, 1),
>> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
>> > 1),
>> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> > acc2._2)
>> >)
>> >
>> >val Part2 = part1.map{ case (key, value) => (key,
>> > (value._1,value._2)) }
>> >
>> > This should give me the result
>> > (LAX,(2,3))
>> > (SFO,(1,3))
>> >
>> >
>> >
>> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen  wrote:
>> >>
>> >> You have a typo in your code at "var acc:", and the map from opPart1
>> >> to opPart2 looks like a no-op, but those aren't the problem I think.
>> >> It sounds like you intend the first element of each pair to be a count
>> >> of nonzero values, but you initialize the first element of the pair to
>> >> v, not 1, in v => (v,1). Try v => (1,1)
>> >>
>> >>
>> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
>> >>  wrote:
>> >> >
>> >> > I am a beginner to Spark and finding it difficult to implement a very
>> >> > simple
>> >> > reduce operation. I read that is ideal to use combineByKey for
>> >> > complex
>> >> > reduce operations.
>> >> >
>> >> > My input:
>> >> >
>> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
>> >> > ("SFO",0),
>> >> > ("SFO",1),
>> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
>> >> > ("KX",9),
>> >> >
>> >> >
>> >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>> >> >
>> >> >
>> >> >  val opPart1 = input.combineByKey(
>> >> >(v) => (v, 1),
>> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
>> >> > acc._2 +
>> >> > 1),
>> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1,
>> >> > acc1._2 +
>> >> > acc2._2)
>> >> >)
>> >> >
>> >> >val opPart2 = opPart1.map{ case (key, value) => (key,
>> >> > (value._1,value._2)) }
>> >> >
>> >> >  opPart2.collectAsMap().map(println(_))
>> >> >
>> >> > If the value is greater than 0, the first accumulator should be
>> >> > incremented
>> >> > by 1, else it remains the sa

Re: Help with using combineByKey

2014-10-09 Thread Sean Owen
It's the exact same reason you wrote:

(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1),

right? the first function establishes an initial value for a count.
The value is either (0,1) or (1,1) depending on whether the value is 0
or not.

You're otherwise using the method just fine. You can write this
function a lot of ways; this is a bit verbose but probably efficient.

Yana's version is distributed. It's just that it uses simple Scala
functions within map(). This also works although the groupByKey() can
be a problem as it requires putting all values for a key in memory,
whereas your combineByKey does not.

On Fri, Oct 10, 2014 at 5:28 AM, HARIPRIYA AYYALASOMAYAJULA
 wrote:
> Sean,
>
> Thank you. It works. But I am still confused about the function. Can you
> kindly throw some light on it?
> I was going through the example mentioned in
> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>
> Is there any better source through which I can learn more about these
> functions? It would be helpful if I can get a chance to look at more
> examples.
> Also, I assume using combineByKey helps us solve it parallel than using
> simple functions provided by scala as mentioned by Yana. Am I correct?
>
> On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen  wrote:
>>
>> Oh duh, sorry. The initialization should of course be (v) => (if (v >
>> 0) 1 else 0, 1)
>> This gives the answer you are looking for. I don't see what Part2 is
>> supposed to do differently.
>>
>> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
>>  wrote:
>> > Hello Sean,
>> >
>> > Thank you, but changing from v to 1 doesn't help me either.
>> >
>> > I am trying to count the number of non-zero values using the first
>> > accumulator.
>> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0),
>> > ("SFO",0),
>> > ("SFO",9))
>> >
>> > val plist = sc.parallelize(newlist)
>> >
>> >  val part1 = plist.combineByKey(
>> >(v) => (1, 1),
>> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
>> > 1),
>> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> > acc2._2)
>> >)
>> >
>> >val Part2 = part1.map{ case (key, value) => (key,
>> > (value._1,value._2)) }
>> >
>> > This should give me the result
>> > (LAX,(2,3))
>> > (SFO,(1,3))
>> >
>> >
>> >
>> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen  wrote:
>> >>
>> >> You have a typo in your code at "var acc:", and the map from opPart1
>> >> to opPart2 looks like a no-op, but those aren't the problem I think.
>> >> It sounds like you intend the first element of each pair to be a count
>> >> of nonzero values, but you initialize the first element of the pair to
>> >> v, not 1, in v => (v,1). Try v => (1,1)
>> >>
>> >>
>> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
>> >>  wrote:
>> >> >
>> >> > I am a beginner to Spark and finding it difficult to implement a very
>> >> > simple
>> >> > reduce operation. I read that is ideal to use combineByKey for
>> >> > complex
>> >> > reduce operations.
>> >> >
>> >> > My input:
>> >> >
>> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
>> >> > ("SFO",0),
>> >> > ("SFO",1),
>> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
>> >> > ("KX",9),
>> >> >
>> >> >
>> >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>> >> >
>> >> >
>> >> >  val opPart1 = input.combineByKey(
>> >> >(v) => (v, 1),
>> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
>> >> > acc._2 +
>> >> > 1),
>> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1,
>> >> > acc1._2 +
>> >> > acc2._2)
>> >> >)
>

Re: Help with using combineByKey

2014-10-09 Thread HARIPRIYA AYYALASOMAYAJULA
Sean,

Thank you. It works. But I am still confused about the function. Can you
kindly throw some light on it?
I was going through the example mentioned in
https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html

Is there any better source through which I can learn more about these
functions? It would be helpful if I can get a chance to look at more
examples.
Also, I assume using combineByKey helps us solve it parallel than using
simple functions provided by scala as mentioned by Yana. Am I correct?

On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen  wrote:

> Oh duh, sorry. The initialization should of course be (v) => (if (v >
> 0) 1 else 0, 1)
> This gives the answer you are looking for. I don't see what Part2 is
> supposed to do differently.
>
> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
>  wrote:
> > Hello Sean,
> >
> > Thank you, but changing from v to 1 doesn't help me either.
> >
> > I am trying to count the number of non-zero values using the first
> > accumulator.
> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0),
> ("SFO",0),
> > ("SFO",9))
> >
> > val plist = sc.parallelize(newlist)
> >
> >  val part1 = plist.combineByKey(
> >(v) => (1, 1),
> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
> 1),
> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> > acc2._2)
> >)
> >
> >val Part2 = part1.map{ case (key, value) => (key,
> (value._1,value._2)) }
> >
> > This should give me the result
> > (LAX,(2,3))
> > (SFO,(1,3))
> >
> >
> >
> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen  wrote:
> >>
> >> You have a typo in your code at "var acc:", and the map from opPart1
> >> to opPart2 looks like a no-op, but those aren't the problem I think.
> >> It sounds like you intend the first element of each pair to be a count
> >> of nonzero values, but you initialize the first element of the pair to
> >> v, not 1, in v => (v,1). Try v => (1,1)
> >>
> >>
> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
> >>  wrote:
> >> >
> >> > I am a beginner to Spark and finding it difficult to implement a very
> >> > simple
> >> > reduce operation. I read that is ideal to use combineByKey for complex
> >> > reduce operations.
> >> >
> >> > My input:
> >> >
> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
> >> > ("SFO",0),
> >> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
> >> > ("KX",9),
> >> >
> >> >
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
> >> >
> >> >
> >> >  val opPart1 = input.combineByKey(
> >> >(v) => (v, 1),
> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
> >> > acc._2 +
> >> > 1),
> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1,
> acc1._2 +
> >> > acc2._2)
> >> >)
> >> >
> >> >val opPart2 = opPart1.map{ case (key, value) => (key,
> >> > (value._1,value._2)) }
> >> >
> >> >  opPart2.collectAsMap().map(println(_))
> >> >
> >> > If the value is greater than 0, the first accumulator should be
> >> > incremented
> >> > by 1, else it remains the same. The second accumulator is a simple
> >> > counter
> >> > for each value.  I am getting an incorrect output (garbage values )for
> >> > the
> >> > first accumulator. Please help.
> >> >
> >> >  The equivalent reduce operation in Hadoop MapReduce is :
> >> >
> >> > public static class PercentageCalcReducer extends
> >> > Reducer
> >> >
> >> > {
> >> >
> >> > private FloatWritable pdelay = new FloatWritable();
> >> >
> >> >
> >> > public void reduce(Text key, Iterable values,Context
> >> > context)throws IOException,InterruptedException
> >> >
> >> > {
> >> >
> >> > int acc2=0;
> >> >
> >> > float frac_delay, percentage_delay;
> >> >
> >> > int acc1=0;
> >> >
> >> > for(IntWritable val : values)
> >> >
> >> > {
> >> >
> >> > if(val.get() > 0)
> >> >
> >> > {
> >> >
> >> > acc1++;
> >> >
> >> > }
> >> >
> >> > acc2++;
> >> >
> >> > }
> >> >
> >> >
> >> >
> >> > frac_delay = (float)acc1/acc2;
> >> >
> >> > percentage_delay = frac_delay * 100 ;
> >> >
> >> > pdelay.set(percentage_delay);
> >> >
> >> > context.write(key,pdelay);
> >> >
> >> > }
> >> >
> >> > }
> >> >
> >> >
> >> > Please help. Thank you for your time.
> >> >
> >> > --
> >> >
> >> > Regards,
> >> >
> >> > Haripriya Ayyalasomayajula
> >> > contact : 650-796-7112
> >
> >
> >
> >
> > --
> > Regards,
> > Haripriya Ayyalasomayajula
> > contact : 650-796-7112
>



-- 
Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112


Re: Help with using combineByKey

2014-10-09 Thread Sean Owen
Oh duh, sorry. The initialization should of course be (v) => (if (v >
0) 1 else 0, 1)
This gives the answer you are looking for. I don't see what Part2 is
supposed to do differently.

On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
 wrote:
> Hello Sean,
>
> Thank you, but changing from v to 1 doesn't help me either.
>
> I am trying to count the number of non-zero values using the first
> accumulator.
> val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0),
> ("SFO",9))
>
> val plist = sc.parallelize(newlist)
>
>  val part1 = plist.combineByKey(
>(v) => (1, 1),
>(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1),
>(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> acc2._2)
>)
>
>val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) }
>
> This should give me the result
> (LAX,(2,3))
> (SFO,(1,3))
>
>
>
> On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen  wrote:
>>
>> You have a typo in your code at "var acc:", and the map from opPart1
>> to opPart2 looks like a no-op, but those aren't the problem I think.
>> It sounds like you intend the first element of each pair to be a count
>> of nonzero values, but you initialize the first element of the pair to
>> v, not 1, in v => (v,1). Try v => (1,1)
>>
>>
>> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
>>  wrote:
>> >
>> > I am a beginner to Spark and finding it difficult to implement a very
>> > simple
>> > reduce operation. I read that is ideal to use combineByKey for complex
>> > reduce operations.
>> >
>> > My input:
>> >
>> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
>> > ("SFO",0),
>> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
>> > ("KX",9),
>> >
>> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>> >
>> >
>> >  val opPart1 = input.combineByKey(
>> >(v) => (v, 1),
>> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
>> > acc._2 +
>> > 1),
>> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> > acc2._2)
>> >)
>> >
>> >val opPart2 = opPart1.map{ case (key, value) => (key,
>> > (value._1,value._2)) }
>> >
>> >  opPart2.collectAsMap().map(println(_))
>> >
>> > If the value is greater than 0, the first accumulator should be
>> > incremented
>> > by 1, else it remains the same. The second accumulator is a simple
>> > counter
>> > for each value.  I am getting an incorrect output (garbage values )for
>> > the
>> > first accumulator. Please help.
>> >
>> >  The equivalent reduce operation in Hadoop MapReduce is :
>> >
>> > public static class PercentageCalcReducer extends
>> > Reducer
>> >
>> > {
>> >
>> > private FloatWritable pdelay = new FloatWritable();
>> >
>> >
>> > public void reduce(Text key, Iterable values,Context
>> > context)throws IOException,InterruptedException
>> >
>> > {
>> >
>> > int acc2=0;
>> >
>> > float frac_delay, percentage_delay;
>> >
>> > int acc1=0;
>> >
>> > for(IntWritable val : values)
>> >
>> > {
>> >
>> > if(val.get() > 0)
>> >
>> > {
>> >
>> > acc1++;
>> >
>> > }
>> >
>> > acc2++;
>> >
>> > }
>> >
>> >
>> >
>> > frac_delay = (float)acc1/acc2;
>> >
>> > percentage_delay = frac_delay * 100 ;
>> >
>> > pdelay.set(percentage_delay);
>> >
>> > context.write(key,pdelay);
>> >
>> > }
>> >
>> > }
>> >
>> >
>> > Please help. Thank you for your time.
>> >
>> > --
>> >
>> > Regards,
>> >
>> > Haripriya Ayyalasomayajula
>> > contact : 650-796-7112
>
>
>
>
> --
> Regards,
> Haripriya Ayyalasomayajula
> contact : 650-796-7112

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with using combineByKey

2014-10-09 Thread HARIPRIYA AYYALASOMAYAJULA
Hello Yana,

Thank you. Yes, it works. However, can you please suggest any examples ( or
links) about the usage of combineByKey.

On Thu, Oct 9, 2014 at 12:03 PM, Yana Kadiyska 
wrote:

> If you just want the ratio of positive to all values per key (if I'm
> reading right) this works
>
> val reduced= input.groupByKey().map(grp=>
> grp._2.filter(v=>v>0).size.toFloat/grp._2.size)
> reduced.foreach(println)
>
> I don't think you need reduceByKey or combineByKey as you're not doing
> anything where the values depend on each other -- you're just counting...
>
> On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA <
> aharipriy...@gmail.com> wrote:
>
>>
>> I am a beginner to Spark and finding it difficult to implement a very
>> simple reduce operation. I read that is ideal to use combineByKey for
>> complex reduce operations.
>>
>> My input:
>>
>> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
>> ("SFO",0), ("SFO",1),
>> ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9),
>> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>>
>>
>>  val opPart1 = input.combineByKey(
>>(v) => (v, 1),
>>(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2
>> + 1),
>>(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> acc2._2)
>>)
>>
>>val opPart2 = opPart1.map{ case (key, value) => (key,
>> (value._1,value._2)) }
>>
>>  opPart2.collectAsMap().map(println(_))
>>
>> If the value is greater than 0, the first accumulator should be
>> incremented by 1, else it remains the same. The second accumulator is a
>> simple counter for each value.  I am getting an incorrect output (garbage
>> values )for the first accumulator. Please help.
>>
>>  The equivalent reduce operation in Hadoop MapReduce is :
>>
>>
>> public static class PercentageCalcReducer extends 
>> Reducer
>>
>> {
>>
>> private FloatWritable pdelay = new FloatWritable();
>>
>>
>> public void reduce(Text key, Iterable values,Context
>> context)throws IOException,InterruptedException
>>
>> {
>>
>> int acc2=0;
>>
>> float frac_delay, percentage_delay;
>>
>> int acc1=0;
>>
>> for(IntWritable val : values)
>>
>> {
>>
>> if(val.get() > 0)
>>
>> {
>>
>> acc1++;
>>
>> }
>>
>> acc2++;
>>
>> }
>>
>>
>>
>> frac_delay = (float)acc1/acc2;
>>
>> percentage_delay = frac_delay * 100 ;
>>
>> pdelay.set(percentage_delay);
>>
>> context.write(key,pdelay);
>>
>> }
>>
>> }
>>
>>
>> Please help. Thank you for your time.
>>
>> --
>>
>> Regards,
>> Haripriya Ayyalasomayajula
>> contact : 650-796-7112
>>
>
>


-- 
Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112


Re: Help with using combineByKey

2014-10-09 Thread HARIPRIYA AYYALASOMAYAJULA
Hello Sean,

Thank you, but changing from v to 1 doesn't help me either.

I am trying to count the number of non-zero values using the first
accumulator.
val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0),
("SFO",9))

val plist = sc.parallelize(newlist)

 val part1 = plist.combineByKey(
   (v) => (1, 1),
   (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1),
   (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
acc2._2)
   )

   val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) }

This should give me the result
(LAX,(2,3))
(SFO,(1,3))



On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen  wrote:

> You have a typo in your code at "var acc:", and the map from opPart1
> to opPart2 looks like a no-op, but those aren't the problem I think.
> It sounds like you intend the first element of each pair to be a count
> of nonzero values, but you initialize the first element of the pair to
> v, not 1, in v => (v,1). Try v => (1,1)
>
>
> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
>  wrote:
> >
> > I am a beginner to Spark and finding it difficult to implement a very
> simple
> > reduce operation. I read that is ideal to use combineByKey for complex
> > reduce operations.
> >
> > My input:
> >
> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
> ("SFO",0),
> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
> > ("KX",9),
> >
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
> >
> >
> >  val opPart1 = input.combineByKey(
> >(v) => (v, 1),
> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
> acc._2 +
> > 1),
> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> > acc2._2)
> >)
> >
> >val opPart2 = opPart1.map{ case (key, value) => (key,
> > (value._1,value._2)) }
> >
> >  opPart2.collectAsMap().map(println(_))
> >
> > If the value is greater than 0, the first accumulator should be
> incremented
> > by 1, else it remains the same. The second accumulator is a simple
> counter
> > for each value.  I am getting an incorrect output (garbage values )for
> the
> > first accumulator. Please help.
> >
> >  The equivalent reduce operation in Hadoop MapReduce is :
> >
> > public static class PercentageCalcReducer extends
> > Reducer
> >
> > {
> >
> > private FloatWritable pdelay = new FloatWritable();
> >
> >
> > public void reduce(Text key, Iterable values,Context
> > context)throws IOException,InterruptedException
> >
> > {
> >
> > int acc2=0;
> >
> > float frac_delay, percentage_delay;
> >
> > int acc1=0;
> >
> > for(IntWritable val : values)
> >
> > {
> >
> > if(val.get() > 0)
> >
> > {
> >
> > acc1++;
> >
> > }
> >
> > acc2++;
> >
> > }
> >
> >
> >
> > frac_delay = (float)acc1/acc2;
> >
> > percentage_delay = frac_delay * 100 ;
> >
> > pdelay.set(percentage_delay);
> >
> > context.write(key,pdelay);
> >
> > }
> >
> > }
> >
> >
> > Please help. Thank you for your time.
> >
> > --
> >
> > Regards,
> >
> > Haripriya Ayyalasomayajula
> > contact : 650-796-7112
>



-- 
Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112


Re: Help with using combineByKey

2014-10-09 Thread Yana Kadiyska
If you just want the ratio of positive to all values per key (if I'm
reading right) this works

val reduced= input.groupByKey().map(grp=>
grp._2.filter(v=>v>0).size.toFloat/grp._2.size)
reduced.foreach(println)

I don't think you need reduceByKey or combineByKey as you're not doing
anything where the values depend on each other -- you're just counting...

On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA <
aharipriy...@gmail.com> wrote:

>
> I am a beginner to Spark and finding it difficult to implement a very
> simple reduce operation. I read that is ideal to use combineByKey for
> complex reduce operations.
>
> My input:
>
> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
> ("SFO",0), ("SFO",1),
> ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9),
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>
>
>  val opPart1 = input.combineByKey(
>(v) => (v, 1),
>(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2
> + 1),
>(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> acc2._2)
>)
>
>val opPart2 = opPart1.map{ case (key, value) => (key,
> (value._1,value._2)) }
>
>  opPart2.collectAsMap().map(println(_))
>
> If the value is greater than 0, the first accumulator should be
> incremented by 1, else it remains the same. The second accumulator is a
> simple counter for each value.  I am getting an incorrect output (garbage
> values )for the first accumulator. Please help.
>
>  The equivalent reduce operation in Hadoop MapReduce is :
>
>
> public static class PercentageCalcReducer extends 
> Reducer
>
> {
>
> private FloatWritable pdelay = new FloatWritable();
>
>
> public void reduce(Text key, Iterable values,Context
> context)throws IOException,InterruptedException
>
> {
>
> int acc2=0;
>
> float frac_delay, percentage_delay;
>
> int acc1=0;
>
> for(IntWritable val : values)
>
> {
>
> if(val.get() > 0)
>
> {
>
> acc1++;
>
> }
>
> acc2++;
>
> }
>
>
>
> frac_delay = (float)acc1/acc2;
>
> percentage_delay = frac_delay * 100 ;
>
> pdelay.set(percentage_delay);
>
> context.write(key,pdelay);
>
> }
>
> }
>
>
> Please help. Thank you for your time.
>
> --
>
> Regards,
> Haripriya Ayyalasomayajula
> contact : 650-796-7112
>


Re: Help with using combineByKey

2014-10-09 Thread Sean Owen
You have a typo in your code at "var acc:", and the map from opPart1
to opPart2 looks like a no-op, but those aren't the problem I think.
It sounds like you intend the first element of each pair to be a count
of nonzero values, but you initialize the first element of the pair to
v, not 1, in v => (v,1). Try v => (1,1)


On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
 wrote:
>
> I am a beginner to Spark and finding it difficult to implement a very simple
> reduce operation. I read that is ideal to use combineByKey for complex
> reduce operations.
>
> My input:
>
> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0),
> ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
> ("KX",9),
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>
>
>  val opPart1 = input.combineByKey(
>(v) => (v, 1),
>(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
> 1),
>(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> acc2._2)
>)
>
>val opPart2 = opPart1.map{ case (key, value) => (key,
> (value._1,value._2)) }
>
>  opPart2.collectAsMap().map(println(_))
>
> If the value is greater than 0, the first accumulator should be incremented
> by 1, else it remains the same. The second accumulator is a simple counter
> for each value.  I am getting an incorrect output (garbage values )for the
> first accumulator. Please help.
>
>  The equivalent reduce operation in Hadoop MapReduce is :
>
> public static class PercentageCalcReducer extends
> Reducer
>
> {
>
> private FloatWritable pdelay = new FloatWritable();
>
>
> public void reduce(Text key, Iterable values,Context
> context)throws IOException,InterruptedException
>
> {
>
> int acc2=0;
>
> float frac_delay, percentage_delay;
>
> int acc1=0;
>
> for(IntWritable val : values)
>
> {
>
> if(val.get() > 0)
>
> {
>
> acc1++;
>
> }
>
> acc2++;
>
> }
>
>
>
> frac_delay = (float)acc1/acc2;
>
> percentage_delay = frac_delay * 100 ;
>
> pdelay.set(percentage_delay);
>
> context.write(key,pdelay);
>
> }
>
> }
>
>
> Please help. Thank you for your time.
>
> --
>
> Regards,
>
> Haripriya Ayyalasomayajula
> contact : 650-796-7112

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Help with using combineByKey

2014-10-09 Thread HARIPRIYA AYYALASOMAYAJULA
I am a beginner to Spark and finding it difficult to implement a very
simple reduce operation. I read that is ideal to use combineByKey for
complex reduce operations.

My input:

val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0),
("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
("KX",9),
("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))


 val opPart1 = input.combineByKey(
   (v) => (v, 1),
   (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
1),
   (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
acc2._2)
   )

   val opPart2 = opPart1.map{ case (key, value) => (key,
(value._1,value._2)) }

 opPart2.collectAsMap().map(println(_))

If the value is greater than 0, the first accumulator should be incremented
by 1, else it remains the same. The second accumulator is a simple counter
for each value.  I am getting an incorrect output (garbage values )for the
first accumulator. Please help.

 The equivalent reduce operation in Hadoop MapReduce is :

public static class PercentageCalcReducer extends
Reducer

{

private FloatWritable pdelay = new FloatWritable();


public void reduce(Text key, Iterable values,Context
context)throws IOException,InterruptedException

{

int acc2=0;

float frac_delay, percentage_delay;

int acc1=0;

for(IntWritable val : values)

{

if(val.get() > 0)

{

acc1++;

}

acc2++;

}



frac_delay = (float)acc1/acc2;

percentage_delay = frac_delay * 100 ;

pdelay.set(percentage_delay);

context.write(key,pdelay);

}

}


Please help. Thank you for your time.

-- 

Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112


Re: aggregateByKey vs combineByKey

2014-09-29 Thread David Rowe
Thanks Liquan, that was really helpful.

On Mon, Sep 29, 2014 at 5:54 PM, Liquan Pei  wrote:

> Hi Dave,
>
> You can replace groupByKey with reduceByKey to improve performance in some
> cases. reduceByKey performs map side combine which can reduce Network IO
> and shuffle size where as groupByKey will not perform map side combine.
>
> combineByKey is more general then aggregateByKey. Actually, the
> implementation of aggregateByKey, reduceByKey and groupByKey is achieved by
> combineByKey. aggregateByKey is similar to reduceByKey but you can provide
> initial values when performing aggregation.
>
> As the name suggests, aggregateByKey is suitable for compute aggregations
> for keys, example aggregations such as sum, avg, etc. The rule here is that
> the extra computation spent for map side combine can reduce the size sent
> out to other nodes and driver. If your func has satisfies this rule, you
> probably should use aggregateByKey.
>
> combineByKey is more general and you have the flexibility to specify
> whether you'd like to perform map side combine. However, it is more complex
> to use. At minimum, you need to implement three functions: createCombiner,
> mergeValue, mergeCombiners.
>
> Hope this helps!
> Liquan
>
> On Sun, Sep 28, 2014 at 11:59 PM, David Rowe  wrote:
>
>> Hi All,
>>
>> After some hair pulling, I've reached the realisation that an operation I
>> am currently doing via:
>>
>> myRDD.groupByKey.mapValues(func)
>>
>> should be done more efficiently using aggregateByKey or combineByKey.
>> Both of these methods would do, and they seem very similar to me in terms
>> of their function.
>>
>> My question is, what are the differences between these two methods (other
>> than the slight differences in their type signatures)? Under what
>> circumstances should I use one or the other?
>>
>> Thanks
>>
>> Dave
>>
>>
>>
>
>
> --
> Liquan Pei
> Department of Physics
> University of Massachusetts Amherst
>


Re: aggregateByKey vs combineByKey

2014-09-29 Thread Liquan Pei
Hi Dave,

You can replace groupByKey with reduceByKey to improve performance in some
cases. reduceByKey performs map side combine which can reduce Network IO
and shuffle size where as groupByKey will not perform map side combine.

combineByKey is more general then aggregateByKey. Actually, the
implementation of aggregateByKey, reduceByKey and groupByKey is achieved by
combineByKey. aggregateByKey is similar to reduceByKey but you can provide
initial values when performing aggregation.

As the name suggests, aggregateByKey is suitable for compute aggregations
for keys, example aggregations such as sum, avg, etc. The rule here is that
the extra computation spent for map side combine can reduce the size sent
out to other nodes and driver. If your func has satisfies this rule, you
probably should use aggregateByKey.

combineByKey is more general and you have the flexibility to specify
whether you'd like to perform map side combine. However, it is more complex
to use. At minimum, you need to implement three functions: createCombiner,
mergeValue, mergeCombiners.

Hope this helps!
Liquan

On Sun, Sep 28, 2014 at 11:59 PM, David Rowe  wrote:

> Hi All,
>
> After some hair pulling, I've reached the realisation that an operation I
> am currently doing via:
>
> myRDD.groupByKey.mapValues(func)
>
> should be done more efficiently using aggregateByKey or combineByKey. Both
> of these methods would do, and they seem very similar to me in terms of
> their function.
>
> My question is, what are the differences between these two methods (other
> than the slight differences in their type signatures)? Under what
> circumstances should I use one or the other?
>
> Thanks
>
> Dave
>
>
>


-- 
Liquan Pei
Department of Physics
University of Massachusetts Amherst


aggregateByKey vs combineByKey

2014-09-29 Thread David Rowe
Hi All,

After some hair pulling, I've reached the realisation that an operation I
am currently doing via:

myRDD.groupByKey.mapValues(func)

should be done more efficiently using aggregateByKey or combineByKey. Both
of these methods would do, and they seem very similar to me in terms of
their function.

My question is, what are the differences between these two methods (other
than the slight differences in their type signatures)? Under what
circumstances should I use one or the other?

Thanks

Dave


Re: combineByKey throws ClassCastException

2014-09-16 Thread Tao Xiao
This problem was caused by the fact that I used a package jar with a Spark
version (0.9.1) different from that of the cluster (0.9.0). When I used the
correct package jar
(spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the
application can run as expected.



2014-09-15 14:57 GMT+08:00 x :

> How about this.
>
> scala> val rdd2 = rdd.combineByKey(
>  | (v: Int) => v.toLong,
>  | (c: Long, v: Int) => c + v,
>  | (c1: Long, c2: Long) => c1 + c2)
> rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
> combineB
> yKey at :14
>
> xj @ Tokyo
>
> On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao 
> wrote:
>
>> I followd an example presented in the tutorial Learning Spark
>> <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html>
>> to compute the per-key average as follows:
>>
>>
>> val Array(appName) = args
>> val sparkConf = new SparkConf()
>> .setAppName(appName)
>> val sc = new SparkContext(sparkConf)
>> /*
>>  * compute the per-key average of values
>>  * results should be:
>>  *A : 5.8
>>  *B : 14
>>  *C : 60.6
>>  */
>> val rdd = sc.parallelize(List(
>> ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
>> ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
>> ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
>> val avg = rdd.combineByKey(
>> (x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
>> cannot be cast to java.lang.Integer
>> (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
>> (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> acc2._2))
>> .map{case (s, t) => (s, t._1/t._2.toFloat)}
>>  avg.collect.foreach(t => println(t._1 + " ->" + t._2))
>>
>>
>>
>> When I submitted the application, an exception of 
>> "*java.lang.ClassCastException:
>> scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown
>> out. The tutorial said that the first function of *combineByKey*, *(x:Int)
>> => (x, 1)*, should take a single element in the source RDD and return an
>> element of the desired type in the resulting RDD. In my application, we
>> take a single element of type *Int *from the source RDD and return a
>> tuple of type (*Int*, *Int*), which meets the requirements quite well.
>> But why would such an exception be thrown?
>>
>> I'm using CDH 5.0 and Spark 0.9
>>
>> Thanks.
>>
>>
>>
>


Re: combineByKey throws ClassCastException

2014-09-14 Thread x
How about this.

scala> val rdd2 = rdd.combineByKey(
 | (v: Int) => v.toLong,
 | (c: Long, v: Int) => c + v,
 | (c1: Long, c2: Long) => c1 + c2)
rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at
combineB
yKey at :14

xj @ Tokyo

On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao  wrote:

> I followd an example presented in the tutorial Learning Spark
> <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html>
> to compute the per-key average as follows:
>
>
> val Array(appName) = args
> val sparkConf = new SparkConf()
> .setAppName(appName)
> val sc = new SparkContext(sparkConf)
> /*
>  * compute the per-key average of values
>  * results should be:
>  *A : 5.8
>  *B : 14
>  *C : 60.6
>  */
> val rdd = sc.parallelize(List(
> ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
> ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
> ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
> val avg = rdd.combineByKey(
> (x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
> cannot be cast to java.lang.Integer
> (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
> (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> acc2._2))
> .map{case (s, t) => (s, t._1/t._2.toFloat)}
>  avg.collect.foreach(t => println(t._1 + " ->" + t._2))
>
>
>
> When I submitted the application, an exception of 
> "*java.lang.ClassCastException:
> scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown
> out. The tutorial said that the first function of *combineByKey*, *(x:Int)
> => (x, 1)*, should take a single element in the source RDD and return an
> element of the desired type in the resulting RDD. In my application, we
> take a single element of type *Int *from the source RDD and return a
> tuple of type (*Int*, *Int*), which meets the requirements quite well.
> But why would such an exception be thrown?
>
> I'm using CDH 5.0 and Spark 0.9
>
> Thanks.
>
>
>


combineByKey throws ClassCastException

2014-09-14 Thread Tao Xiao
I followd an example presented in the tutorial Learning Spark
<http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html>
to compute the per-key average as follows:


val Array(appName) = args
val sparkConf = new SparkConf()
.setAppName(appName)
val sc = new SparkContext(sparkConf)
/*
 * compute the per-key average of values
 * results should be:
 *A : 5.8
 *B : 14
 *C : 60.6
 */
val rdd = sc.parallelize(List(
("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5),
("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25),
("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2)
val avg = rdd.combineByKey(
(x:Int) => (x, 1),  // java.lang.ClassCastException: scala.Tuple2$mcII$sp
cannot be cast to java.lang.Integer
(acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1),
(acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
acc2._2))
.map{case (s, t) => (s, t._1/t._2.toFloat)}
 avg.collect.foreach(t => println(t._1 + " ->" + t._2))



When I submitted the application, an exception of
"*java.lang.ClassCastException:
scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown out.
The tutorial said that the first function of *combineByKey*, *(x:Int) =>
(x, 1)*, should take a single element in the source RDD and return an
element of the desired type in the resulting RDD. In my application, we
take a single element of type *Int *from the source RDD and return a tuple
of type (*Int*, *Int*), which meets the requirements quite well. But why
would such an exception be thrown?

I'm using CDH 5.0 and Spark 0.9

Thanks.


Re: combineByKey at ShuffledDStream.scala

2014-07-23 Thread Bill Jay
The streaming program contains the following main stages:

1. receive data from Kafka
2. preprocessing of the data. These are all map and filtering stages.
3. Group by a field
4. Process the groupBy results using map. Inside this processing, I use
collect, count.

Thanks!

Bill


On Tue, Jul 22, 2014 at 10:05 PM, Tathagata Das  wrote:

> Can you give an idea of the streaming program? Rest of the transformation
> you are doing on the input streams?
>
>
> On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay 
> wrote:
>
>> Hi all,
>>
>> I am currently running a Spark Streaming program, which consumes data
>> from Kakfa and does the group by operation on the data. I try to optimize
>> the running time of the program because it looks slow to me. It seems the
>> stage named:
>>
>> * combineByKey at ShuffledDStream.scala:42 *
>>
>> always takes the longest running time. And If I open this stage, I only
>> see two executors on this stage. Does anyone has an idea what this stage
>> does and how to increase the speed for this stage? Thanks!
>>
>> Bill
>>
>
>


Re: combineByKey at ShuffledDStream.scala

2014-07-22 Thread Tathagata Das
Can you give an idea of the streaming program? Rest of the transformation
you are doing on the input streams?


On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay 
wrote:

> Hi all,
>
> I am currently running a Spark Streaming program, which consumes data from
> Kakfa and does the group by operation on the data. I try to optimize the
> running time of the program because it looks slow to me. It seems the stage
> named:
>
> * combineByKey at ShuffledDStream.scala:42 *
>
> always takes the longest running time. And If I open this stage, I only
> see two executors on this stage. Does anyone has an idea what this stage
> does and how to increase the speed for this stage? Thanks!
>
> Bill
>


combineByKey at ShuffledDStream.scala

2014-07-22 Thread Bill Jay
Hi all,

I am currently running a Spark Streaming program, which consumes data from
Kakfa and does the group by operation on the data. I try to optimize the
running time of the program because it looks slow to me. It seems the stage
named:

* combineByKey at ShuffledDStream.scala:42 *

always takes the longest running time. And If I open this stage, I only see
two executors on this stage. Does anyone has an idea what this stage does
and how to increase the speed for this stage? Thanks!

Bill


Re: When to use CombineByKey vs reduceByKey?

2014-06-12 Thread Diana Hu
Matei,

Thanks for the answer this clarifies this very much. Based on my usage I
would use combineByKey, since the output is another custom data structures.

I found out my issues with combineByKey were relieved after doing more
tuning with the level of parallelism. I've found that it really depends on
the size of my dataset, since I did tests for 1000, 10K, 100K, 1M data
points, for now the GC issue is under control once I modified my data
structures to be mutable and the key part I was missing was that all
classes within it need it to be serializable

Thanks!

- Diana


On Wed, Jun 11, 2014 at 6:06 PM, Matei Zaharia 
wrote:

> combineByKey is designed for when your return type from the aggregation is
> different from the values being aggregated (e.g. you group together
> objects), and it should allow you to modify the leftmost argument of each
> function (mergeCombiners, mergeValue, etc) and return that instead of
> allocating a new object. So it should work with mutable objects — please
> post what problems you had with that. reduceByKey actually also allows this
> if your types are the same.
>
> Matei
>
>
> On Jun 11, 2014, at 3:21 PM, Diana Hu  wrote:
>
> Hello all,
>
> I've seen some performance improvements using combineByKey as opposed to
> reduceByKey or a groupByKey+map function. I have a couple questions. it'd
> be great if any one can provide some light into this.
>
> 1) When should I use combineByKey vs reduceByKey?
>
> 2) Do the containers need to be immutable for combineByKey? I've created
> custom data structures for the containers, one mutable and one immutable.
> The tests with the mutable containers, spark crashed with an error on
> missing references. However the downside of immutable containers (which
> works on my tests), is that for large datasets the garbage collector gets
> called many more times, and it tends to run out of heap space as the GC
> can't catch up. I tried some of the tips here
> http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning
> the JVM params, but this seems to be too much tuning?
>
> Thanks in advance,
> - Diana
>
>
>


Re: When to use CombineByKey vs reduceByKey?

2014-06-11 Thread Matei Zaharia
combineByKey is designed for when your return type from the aggregation is 
different from the values being aggregated (e.g. you group together objects), 
and it should allow you to modify the leftmost argument of each function 
(mergeCombiners, mergeValue, etc) and return that instead of allocating a new 
object. So it should work with mutable objects — please post what problems you 
had with that. reduceByKey actually also allows this if your types are the same.

Matei

On Jun 11, 2014, at 3:21 PM, Diana Hu  wrote:

> Hello all,
> 
> I've seen some performance improvements using combineByKey as opposed to 
> reduceByKey or a groupByKey+map function. I have a couple questions. it'd be 
> great if any one can provide some light into this.
> 
> 1) When should I use combineByKey vs reduceByKey?
> 
> 2) Do the containers need to be immutable for combineByKey? I've created 
> custom data structures for the containers, one mutable and one immutable. The 
> tests with the mutable containers, spark crashed with an error on missing 
> references. However the downside of immutable containers (which works on my 
> tests), is that for large datasets the garbage collector gets called many 
> more times, and it tends to run out of heap space as the GC can't catch up. I 
> tried some of the tips here 
> http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning the 
> JVM params, but this seems to be too much tuning?
> 
> Thanks in advance,
> - Diana



When to use CombineByKey vs reduceByKey?

2014-06-11 Thread Diana Hu
Hello all,

I've seen some performance improvements using combineByKey as opposed to
reduceByKey or a groupByKey+map function. I have a couple questions. it'd
be great if any one can provide some light into this.

1) When should I use combineByKey vs reduceByKey?

2) Do the containers need to be immutable for combineByKey? I've created
custom data structures for the containers, one mutable and one immutable.
The tests with the mutable containers, spark crashed with an error on
missing references. However the downside of immutable containers (which
works on my tests), is that for large datasets the garbage collector gets
called many more times, and it tends to run out of heap space as the GC
can't catch up. I tried some of the tips here
http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning
the JVM params, but this seems to be too much tuning?

Thanks in advance,
- Diana


Re: combinebykey throw classcastexception

2014-05-20 Thread xiemeilong
This issue is turned out cased by version mismatch between driver(0.9.1) and
server(0.9.0-cdh5.0.1) just now.  Other function works fine but combinebykey
before.

Thank you very much for your reply.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6060p6087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: combinebykey throw classcastexception

2014-05-20 Thread Sean Owen
You asked off-list, and provided a more detailed example there:

val random = new Random()
val testdata = (1 to 1).map(_=>(random.nextInt(),random.nextInt()))
sc.parallelize(testdata).combineByKey[ArrayBuffer[Int]](
  (instant:Int)=>{new ArrayBuffer[Int]()},
  (bucket:ArrayBuffer[Int],instant:Int)=>{bucket+=instant},
  (bucket1:ArrayBuffer[Int],bucket2:ArrayBuffer[Int])=>{bucket1++=bucket2}
).collect()

https://www.quora.com/Why-is-my-combinebykey-throw-classcastexception

I can't reproduce this with Spark 0.9.0  / CDH5 or Spark 1.0.0 RC9.
Your definition looks fine too. (Except that you are dropping the
first value, but that's a different problem.)

On Tue, May 20, 2014 at 2:05 AM, xiemeilong  wrote:
> I am using CDH5 on a three machines cluster. map data from hbase as (string,
> V) pair , then call combineByKey like this:
>
> .combineByKey[C](
>   (v:V)=>new C(v),   //this line throw java.lang.ClassCastException: C
> cannot be cast to V
>   (v:C,v:V)=>C,
>   (c1:C,c2:C)=>C)
>
>
> I am very confused of this, there isn't C to V casting at all.  What's
> wrong?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6059.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


combinebykey throw classcastexception

2014-05-19 Thread xiemeilong
I am using CDH5 on a three machines cluster. map data from hbase as (string,
V) pair , then call combineByKey like this: 

.combineByKey[C]( 
  (v:V)=>new C(v),   //this line throw java.lang.ClassCastException: C
cannot be cast to V 
  (v:C,v:V)=>C, 
  (c1:C,c2:C)=>C) 


I am very confused of this, there isn't C to V casting at all.  What's
wrong?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6059.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Use combineByKey and StatCount

2014-04-14 Thread Cheng Lian
Not very sure about the meaning of “mean of RDD by key”, is this what you
want?

val meansByKey = rdd
  .map { case (k, v) =>
k -> (v, 1)
  }
  .reduceByKey { (lhs, rhs) =>
(lhs._1 + rhs._1, lhs._2 + rhs._2)
  }
  .map { case (sum, count) =>
sum / count
  }
  .collectAsMap()

With this, you need to be careful about overflow though.


On Tue, Apr 1, 2014 at 10:55 PM, Jaonary Rabarisoa wrote:

> Hi all;
>
> Can someone give me some tips to compute mean of RDD by key , maybe with
> combineByKey and StatCount.
>
> Cheers,
>
> Jaonary
>


Re: Use combineByKey and StatCount

2014-04-14 Thread dachuan
it seems you can imitate RDD.top()'s implementation. for each partition,
you get the number of records, and the total sum of key, and in the final
result handler, you add all the sum together, and add the number of records
together, then you can get the mean, I mean, arithmetic mean.


On Tue, Apr 1, 2014 at 10:55 AM, Jaonary Rabarisoa wrote:

> Hi all;
>
> Can someone give me some tips to compute mean of RDD by key , maybe with
> combineByKey and StatCount.
>
> Cheers,
>
> Jaonary
>



-- 
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210


Use combineByKey and StatCount

2014-04-01 Thread Jaonary Rabarisoa
Hi all;

Can someone give me some tips to compute mean of RDD by key , maybe with
combineByKey and StatCount.

Cheers,

Jaonary