How to monitor the throughput and latency of the combineByKey transformation in Spark 3?
Hi community, I built a simple count and sum spark application which uses the combineByKey transformation [1] and I would like to monitor the throughput in/out of this transformation and the latency that the combineByKey spends to pre-aggregate tuples. Ideally, the latency I would like to take the average of the last 30 seconds using a histogram and the 99th percentile. I was imagining to add a dropwizard metrics [2] on the combiner function that I pass to the combineByKey. But It is confused because there are 2 more functions that I must pass to the combineByKey. How would you suggest me to implement this monitoring strategy? Thanks, Felipe [1] https://github.com/felipegutierrez/explore-spark/blob/master/src/main/scala/org/sense/spark/app/combiners/TaxiRideCountCombineByKey.scala#L40 [2] https://metrics.dropwizard.io/4.1.2/getting-started.html -- -- Felipe Gutierrez -- skype: felipe.o.gutierrez -- https://felipeogutierrez.blogspot.com - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: combineByKey
Take a look at this SOF: https://stackoverflow.com/questions/24804619/how-does-spark-aggregate-function-aggregatebykey-work On Fri, Apr 5, 2019 at 12:25 PM Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > Thank you for the details. It is a typo error while composing the mail. > Below is the actual flow. > > Any idea, why the combineByKey is not working. aggregateByKey is working. > > //Defining createCombiner, mergeValue and mergeCombiner functions > > def createCombiner = (Id: String, value: String) => (value :: Nil).toSet > > def mergeValue = (accumulator1: Set[String], accumulator2: (String, > String)) => accumulator1 ++ Set(accumulator2._2) > > def mergeCombiner: (Set[String], Set[String]) => Set[String] = > (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ > accumulator2 > > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *Compile Error:-* > found : (String, String) => scala.collection.immutable.Set[String] > required: ((String, String)) => ? > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *aggregateByKey =>* > > val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, > (x.Id, x.value))).aggregateByKey(Set[String]())( > (aggr, value) => aggr ++ Set(value._2), > (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap > > print(result) > > Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 -> > Set(t1, t2)) > > Regards, > Rajesh > > On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin > wrote: > >> I broke some of your code down into the following lines: >> >> import spark.implicits._ >> >> val a: RDD[Messages]= sc.parallelize(messages) >> val b: Dataset[Messages] = a.toDF.as[Messages] >> val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp >> + "-" + x.Id, (x.Id, x.value))} >> >> You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't >> have the types you think for the reduceByKey. >> >> I recommend breaking the code down like this to statement-by-statement >> when you get into a dance with the Scala type system. >> >> The type-safety that you're after (that eventually makes life *easier*) >> is best supported by Dataset (would have prevented the .id vs .Id error). >> Although there are some performance tradeoffs vs RDD and DataFrame... >> >> >> >> >> >> >> On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < >> mrajaf...@gmail.com> wrote: >> >>> Hi, >>> >>> Any issue in the below code. >>> >>> case class Messages(timeStamp: Int, Id: String, value: String) >>> >>> val messages = Array( >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t1"), >>> Messages(0, "d1", "t2"), >>> Messages(0, "d1", "t2"), >>> Messages(0, "d1", "t3"), >>> Messages(0, "d1", "t4"), >>> Messages(0, "d2", "t1"), >>> Messages(0, "d2", "t1"), >>> Messages(0, "d2", "t5"), >>> Messages(0, "d2", "t6"), >>> Messages(0, "d2", "t2"), >>> Messages(0, "d2", "t2"), >>> Messages(0, "d3", "t1"), >>> Messages(0, "d3", "t1"), >>> Messages(0, "d3", "t2") >>> ) >>> >>> //Defining createCombiner, mergeValue and mergeCombiner functions >>> def createCombiner = (id: String, value: String) => Set(value) >>> >>> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, >>> String)) => accumulator1 ++ Set(accumulator2._2) >>> >>> def mergeCombiner: (Set[String], Set[String]) => Set[String] = >>> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ >>> accumulator2 >>> >>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >>> >>> *Compile Error:-* >>> found : (String, String) => scala.collection.immutable.Set[String] >>> required: ((String, String)) => ? >>> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >>> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >>> >>> Regards, >>> Rajesh >>> >>> >> >> -- >> Thanks, >> Jason >> > -- Thanks, Jason
Re: combineByKey
Hi, Thank you for the details. It is a typo error while composing the mail. Below is the actual flow. Any idea, why the combineByKey is not working. aggregateByKey is working. //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (Id: String, value: String) => (value :: Nil).toSet def mergeValue = (accumulator1: Set[String], accumulator2: (String, String)) => accumulator1 ++ Set(accumulator2._2) def mergeCombiner: (Set[String], Set[String]) => Set[String] = (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ accumulator2 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *Compile Error:-* found : (String, String) => scala.collection.immutable.Set[String] required: ((String, String)) => ? sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *aggregateByKey =>* val result = sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.Id, (x.Id, x.value))).aggregateByKey(Set[String]())( (aggr, value) => aggr ++ Set(value._2), (aggr1, aggr2) => aggr1 ++ aggr2).collect().toMap print(result) Map(0-d1 -> Set(t1, t2, t3, t4), 0-d2 -> Set(t1, t5, t6, t2), 0-d3 -> Set(t1, t2)) Regards, Rajesh On Fri, Apr 5, 2019 at 9:58 PM Jason Nerothin wrote: > I broke some of your code down into the following lines: > > import spark.implicits._ > > val a: RDD[Messages]= sc.parallelize(messages) > val b: Dataset[Messages] = a.toDF.as[Messages] > val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp + > "-" + x.Id, (x.Id, x.value))} > > You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't > have the types you think for the reduceByKey. > > I recommend breaking the code down like this to statement-by-statement > when you get into a dance with the Scala type system. > > The type-safety that you're after (that eventually makes life *easier*) is > best supported by Dataset (would have prevented the .id vs .Id error). > Although there are some performance tradeoffs vs RDD and DataFrame... > > > > > > > On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < > mrajaf...@gmail.com> wrote: > >> Hi, >> >> Any issue in the below code. >> >> case class Messages(timeStamp: Int, Id: String, value: String) >> >> val messages = Array( >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t1"), >> Messages(0, "d1", "t2"), >> Messages(0, "d1", "t2"), >> Messages(0, "d1", "t3"), >> Messages(0, "d1", "t4"), >> Messages(0, "d2", "t1"), >> Messages(0, "d2", "t1"), >> Messages(0, "d2", "t5"), >> Messages(0, "d2", "t6"), >> Messages(0, "d2", "t2"), >> Messages(0, "d2", "t2"), >> Messages(0, "d3", "t1"), >> Messages(0, "d3", "t1"), >> Messages(0, "d3", "t2") >> ) >> >> //Defining createCombiner, mergeValue and mergeCombiner functions >> def createCombiner = (id: String, value: String) => Set(value) >> >> def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, >> String)) => accumulator1 ++ Set(accumulator2._2) >> >> def mergeCombiner: (Set[String], Set[String]) => Set[String] = >> (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ >> accumulator2 >> >> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >> >> *Compile Error:-* >> found : (String, String) => scala.collection.immutable.Set[String] >> required: ((String, String)) => ? >> sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, >> x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) >> >> Regards, >> Rajesh >> >> > > -- > Thanks, > Jason >
Re: combineByKey
I broke some of your code down into the following lines: import spark.implicits._ val a: RDD[Messages]= sc.parallelize(messages) val b: Dataset[Messages] = a.toDF.as[Messages] val c: Dataset[(String, (String, String))] = b.map{x => (x.timeStamp + "-" + x.Id, (x.Id, x.value))} You didn't capitalize .Id and your mergeValue0 and mergeCombiner don't have the types you think for the reduceByKey. I recommend breaking the code down like this to statement-by-statement when you get into a dance with the Scala type system. The type-safety that you're after (that eventually makes life *easier*) is best supported by Dataset (would have prevented the .id vs .Id error). Although there are some performance tradeoffs vs RDD and DataFrame... On Fri, Apr 5, 2019 at 2:11 AM Madabhattula Rajesh Kumar < mrajaf...@gmail.com> wrote: > Hi, > > Any issue in the below code. > > case class Messages(timeStamp: Int, Id: String, value: String) > > val messages = Array( > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t1"), > Messages(0, "d1", "t2"), > Messages(0, "d1", "t2"), > Messages(0, "d1", "t3"), > Messages(0, "d1", "t4"), > Messages(0, "d2", "t1"), > Messages(0, "d2", "t1"), > Messages(0, "d2", "t5"), > Messages(0, "d2", "t6"), > Messages(0, "d2", "t2"), > Messages(0, "d2", "t2"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t1"), > Messages(0, "d3", "t2") > ) > > //Defining createCombiner, mergeValue and mergeCombiner functions > def createCombiner = (id: String, value: String) => Set(value) > > def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, > String)) => accumulator1 ++ Set(accumulator2._2) > > def mergeCombiner: (Set[String], Set[String]) => Set[String] = > (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ > accumulator2 > > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > *Compile Error:-* > found : (String, String) => scala.collection.immutable.Set[String] > required: ((String, String)) => ? > sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, > x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) > > Regards, > Rajesh > > -- Thanks, Jason
combineByKey
Hi, Any issue in the below code. case class Messages(timeStamp: Int, Id: String, value: String) val messages = Array( Messages(0, "d1", "t1"), Messages(0, "d1", "t1"), Messages(0, "d1", "t1"), Messages(0, "d1", "t1"), Messages(0, "d1", "t2"), Messages(0, "d1", "t2"), Messages(0, "d1", "t3"), Messages(0, "d1", "t4"), Messages(0, "d2", "t1"), Messages(0, "d2", "t1"), Messages(0, "d2", "t5"), Messages(0, "d2", "t6"), Messages(0, "d2", "t2"), Messages(0, "d2", "t2"), Messages(0, "d3", "t1"), Messages(0, "d3", "t1"), Messages(0, "d3", "t2") ) //Defining createCombiner, mergeValue and mergeCombiner functions def createCombiner = (id: String, value: String) => Set(value) def mergeValue0 = (accumulator1: Set[String], accumulator2: (String, String)) => accumulator1 ++ Set(accumulator2._2) def mergeCombiner: (Set[String], Set[String]) => Set[String] = (accumulator1: Set[String], accumulator2: Set[String]) => accumulator1 ++ accumulator2 sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) *Compile Error:-* found : (String, String) => scala.collection.immutable.Set[String] required: ((String, String)) => ? sc.parallelize(messages).map(x => (x.timeStamp+"-"+x.id, (x.id, x.value))).combineByKey(createCombiner, mergeValue, mergeCombiner) Regards, Rajesh
Re: Inconsistent results with combineByKey API
Ping.. Can someone please correct me whether this is an issue or not. - Swapnil On Thu, Aug 31, 2017 at 12:27 PM, Swapnil Shinde wrote: > Hello All > > I am observing some strange results with aggregateByKey API which is > implemented with combineByKey. Not sure if this is by design or bug - > > I created this toy example but same problem can be observed on large > datasets as well - > > *case class ABC(key: Int, c1: Int, c2: Int)* > *case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)* > > // Create RDD and making sure if has 1 or 2 partitions for this example. > // With 2 partitions there are high chances that same key could be in same > partition. > *val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20, > 40), ABC(2, 20, 40))).coalece(2)* > > Now, I am running aggregateByKey where I am grouping by Key to sum c1 and > c2 but return ABCoutput with new 'desc' property. > > *val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) > ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", > x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => > ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* > > Above query may return results like this - > [image: Inline image 1] > > It means for one of the keys which has all values in same partition didn't > invoke mergeCombiner function which returns ABCoutput with desc=final. > > I am expecting mergeCombiner function to be invoked all the time which is > not happening. Correct me if wrong, but is this expected behavior? > > Further debugging shows that it works fine if I create input RDD with more > partitions( which increases chances of having rows with same key in > different partitions) > > *val b = a.repartition(20).keyBy(x => > x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) > => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: > ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, > m1.c2Sum+m2.c2Sum))* > [image: Inline image 2] > > One more thing to mention - If I make sure my input RDD is partitioned > then it simply runs aggregation with mapPartitions (here > <https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>). > Now, this makes sense in terms of aggregations as all values for given key > are in same partition. However, I have something in my mergeCombiner > function that I would like to run which wont get invoked. > Traditional map reduce allows to have different combiner and reduce > function and it is guaranteed that reduce is always invoked. I can see that > running aggregations with no shuffle has performance gains but API seems to > be confusing/misleading. User might hope that mergeCombiner gets invoked > but in reality it isn't. It will be great if this API designers can shed > some light on this. > > *import org.apache.spark.HashPartitioner* > *val b = a.keyBy(x => x.key).partitionBy(new > HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: > ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, > x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, > "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* > > [image: Inline image 3] > > Above examples shows this behavior with AggregateByKey but same thing can > be observed with CombineByKey as well. > *val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key, > "initial", x.c1, x.c2), * > * (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum > + x2.c1, x1.c2Sum+x2.c2),* > * (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum + > x2.c1Sum, x1.c2Sum+x2.c2Sum))* > > *[image: Inline image 4]* > > > Please let me know if you need any further information and correct me if > my understanding of API is wrong. > > Thanks > Swapnil >
Inconsistent results with combineByKey API
Hello All I am observing some strange results with aggregateByKey API which is implemented with combineByKey. Not sure if this is by design or bug - I created this toy example but same problem can be observed on large datasets as well - *case class ABC(key: Int, c1: Int, c2: Int)* *case class ABCoutput(key: Int, desc: String, c1Sum: Int, c2Sum: Int)* // Create RDD and making sure if has 1 or 2 partitions for this example. // With 2 partitions there are high chances that same key could be in same partition. *val a = sc.makeRDD[ABC](List(ABC(1, 10, 20), ABC(1, 10, 20), ABC(2, 20, 40), ABC(2, 20, 40))).coalece(2)* Now, I am running aggregateByKey where I am grouping by Key to sum c1 and c2 but return ABCoutput with new 'desc' property. *val b = a.keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* Above query may return results like this - [image: Inline image 1] It means for one of the keys which has all values in same partition didn't invoke mergeCombiner function which returns ABCoutput with desc=final. I am expecting mergeCombiner function to be invoked all the time which is not happening. Correct me if wrong, but is this expected behavior? Further debugging shows that it works fine if I create input RDD with more partitions( which increases chances of having rows with same key in different partitions) *val b = a.repartition(20).keyBy(x => x.key).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* [image: Inline image 2] One more thing to mention - If I make sure my input RDD is partitioned then it simply runs aggregation with mapPartitions (here <https://github.com/apache/spark/blob/v2.0.1/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L93>). Now, this makes sense in terms of aggregations as all values for given key are in same partition. However, I have something in my mergeCombiner function that I would like to run which wont get invoked. Traditional map reduce allows to have different combiner and reduce function and it is guaranteed that reduce is always invoked. I can see that running aggregations with no shuffle has performance gains but API seems to be confusing/misleading. User might hope that mergeCombiner gets invoked but in reality it isn't. It will be great if this API designers can shed some light on this. *import org.apache.spark.HashPartitioner* *val b = a.keyBy(x => x.key).partitionBy(new HashPartitioner(20)).aggregateByKey(ABCoutput(0,"initial",0,0)) ((x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum+x2.c1, x1.c2Sum+x2.c2), (m1: ABCoutput, m2:ABCoutput) => ABCoutput(m1.key, "final", m1.c1Sum+m2.c1Sum, m1.c2Sum+m2.c2Sum))* [image: Inline image 3] Above examples shows this behavior with AggregateByKey but same thing can be observed with CombineByKey as well. *val b = a.keyBy(x => x.key).combineByKey( (x: ABC) => ABCoutput(x.key, "initial", x.c1, x.c2), * * (x1: ABCoutput, x2: ABC) => ABCoutput(x1.key, "intermediate", x1.c1Sum + x2.c1, x1.c2Sum+x2.c2),* * (x1: ABCoutput, x2: ABCoutput) => ABCoutput(x1.key, "final", x1.c1Sum + x2.c1Sum, x1.c2Sum+x2.c2Sum))* *[image: Inline image 4]* Please let me know if you need any further information and correct me if my understanding of API is wrong. Thanks Swapnil
Partitioning Data to optimize combineByKey
Hello, I am trying to process a dataset that is approximately 2 tb using a cluster with 4.5 tb of ram. The data is in parquet format and is initially loaded into a dataframe. A subset of the data is then queried for and converted to RDD for more complicated processing. The first stage of that processing is to mapToPair to use each rows id as the key in a tuple. Then the data goes through a combineByKey operation to group all values with the same key. This operation always exceeds the maximum cluster memory and the job eventually fails. While it is shuffling there is a lot of "spilling in-memory map to disk" messages. I am wondering if I were to have the data initially partitioned such that all the rows with the same id resided within the same partition if it would need to do left shuffling and perform correctly. To do the initial load I am using: sqlContext.read().parquet(inputPathArray).repartition(1, new Column("id")); I am not sure if this is the correct way to partition a dataframe so that is my first question is the above correct. My next question is that when I go from the dataframe to rdd using: JavaRDD locationsForSpecificKey = sqlc.sql("SELECT * FROM standardlocationrecords WHERE customerID = " + customerID + " AND partnerAppID = " + partnerAppID) .toJavaRDD().map(new LocationRecordFromRow()::apply); is the partition scheme from the dataframe preserved or do I need to repartition after doing a mapToPair using: rdd.partitionBy and passing in a custom HashPartitioner that uses the hash of the ID field. My goal is to reduce the shuffling when doing the final combineByKey to prevent the job from running out of memory and failing. Any help would be greatly appreciated. Thanks, Nathan
Re: Datasets combineByKey
yes it is On Apr 10, 2016 3:17 PM, "Amit Sela" wrote: > I think *org.apache.spark.sql.expressions.Aggregator* is what I'm looking > for, makes sense ? > > On Sun, Apr 10, 2016 at 4:08 PM Amit Sela wrote: > >> I'm mapping RDD API to Datasets API and I was wondering if I was missing >> something or is this functionality is missing. >> >> >> On Sun, Apr 10, 2016 at 3:00 PM Ted Yu wrote: >> >>> Haven't found any JIRA w.r.t. combineByKey for Dataset. >>> >>> What's your use case ? >>> >>> Thanks >>> >>> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela wrote: >>> >>>> Is there (planned ?) a combineByKey support for Dataset ? >>>> Is / Will there be a support for combiner lifting ? >>>> >>>> Thanks, >>>> Amit >>>> >>> >>>
Re: Datasets combineByKey
I think *org.apache.spark.sql.expressions.Aggregator* is what I'm looking for, makes sense ? On Sun, Apr 10, 2016 at 4:08 PM Amit Sela wrote: > I'm mapping RDD API to Datasets API and I was wondering if I was missing > something or is this functionality is missing. > > > On Sun, Apr 10, 2016 at 3:00 PM Ted Yu wrote: > >> Haven't found any JIRA w.r.t. combineByKey for Dataset. >> >> What's your use case ? >> >> Thanks >> >> On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela wrote: >> >>> Is there (planned ?) a combineByKey support for Dataset ? >>> Is / Will there be a support for combiner lifting ? >>> >>> Thanks, >>> Amit >>> >> >>
Re: Datasets combineByKey
I'm mapping RDD API to Datasets API and I was wondering if I was missing something or is this functionality is missing. On Sun, Apr 10, 2016 at 3:00 PM Ted Yu wrote: > Haven't found any JIRA w.r.t. combineByKey for Dataset. > > What's your use case ? > > Thanks > > On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela wrote: > >> Is there (planned ?) a combineByKey support for Dataset ? >> Is / Will there be a support for combiner lifting ? >> >> Thanks, >> Amit >> > >
Re: Datasets combineByKey
Haven't found any JIRA w.r.t. combineByKey for Dataset. What's your use case ? Thanks On Sat, Apr 9, 2016 at 7:38 PM, Amit Sela wrote: > Is there (planned ?) a combineByKey support for Dataset ? > Is / Will there be a support for combiner lifting ? > > Thanks, > Amit >
Datasets combineByKey
Is there (planned ?) a combineByKey support for Dataset ? Is / Will there be a support for combiner lifting ? Thanks, Amit
RE: aggregateByKey vs combineByKey
Hi Marco,In your case, since you don't need to perform an aggregation (such as a sum or average) over each key, using groupByKey may perform better. groupByKey inherently utilizes compactBuffer which is much more efficient than ArrayBuffer. Thanks.LIN Chen Date: Tue, 5 Jan 2016 21:13:40 + Subject: aggregateByKey vs combineByKey From: mmistr...@gmail.com To: user@spark.apache.org Hi all i have the following dataSet kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)] It's a simple list of tuples containing (word_length, word) What i wanted to do was to group the result by key in order to have a result in the form [(word_length_1, [word1, word2, word3], word_length_2, [word4, word5, word6]) so i browsed spark API and was able to get the result i wanted using two different functions . scala> kv.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[St ring], y:List[String]) => x ::: y).collect() res86: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)), (4,L ist(test)), (6,List(string))) and scala> scala> kv.aggregateByKey(List[String]())((acc, item) => item :: acc, |(acc1, acc2) => acc1 ::: acc2).collect() res87: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)), (4,L ist(test)), (6,List(string))) Now, question is: any advantages of using one instead of the others? Am i somehow misusing the API for what i want to do? kind regards marco
Re: aggregateByKey vs combineByKey
Looking at PairRDDFunctions.scala : def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { ... combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) I think the two operations should be have similar performance. Cheers On Tue, Jan 5, 2016 at 1:13 PM, Marco Mistroni wrote: > Hi all > i have the following dataSet > kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)] > > It's a simple list of tuples containing (word_length, word) > > What i wanted to do was to group the result by key in order to have a > result in the form > > [(word_length_1, [word1, word2, word3], word_length_2, [word4, word5, > word6]) > > so i browsed spark API and was able to get the result i wanted using two > different > functions > . > > scala> kv.combineByKey(List(_), (x:List[String], y:String) => y :: x, > (x:List[St > > ring], y:List[String]) => x ::: y).collect() > > res86: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, > am)), (4,L > ist(test)), (6,List(string))) > > and > > scala> > > scala> kv.aggregateByKey(List[String]())((acc, item) => item :: acc, > > |(acc1, acc2) => acc1 ::: acc2).collect() > > > > > > > > res87: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, > am)), (4,L > ist(test)), (6,List(string))) > > Now, question is: any advantages of using one instead of the others? > Am i somehow misusing the API for what i want to do? > > kind regards > marco > > > > > > > >
aggregateByKey vs combineByKey
Hi all i have the following dataSet kv = [(2,Hi), (1,i), (2,am), (1,a), (4,test), (6,s tring)] It's a simple list of tuples containing (word_length, word) What i wanted to do was to group the result by key in order to have a result in the form [(word_length_1, [word1, word2, word3], word_length_2, [word4, word5, word6]) so i browsed spark API and was able to get the result i wanted using two different functions . scala> kv.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[St ring], y:List[String]) => x ::: y).collect() res86: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)), (4,L ist(test)), (6,List(string))) and scala> scala> kv.aggregateByKey(List[String]())((acc, item) => item :: acc, |(acc1, acc2) => acc1 ::: acc2).collect() res87: Array[(Int, List[String])] = Array((1,List(i, a)), (2,List(Hi, am)), (4,L ist(test)), (6,List(string))) Now, question is: any advantages of using one instead of the others? Am i somehow misusing the API for what i want to do? kind regards marco
Re: OOM in SizeEstimator while using combineByKey
I am setting spark.executor.memory as 1024m on a 3 node cluster with each node having 4 cores and 7 GB RAM. The combiner functions are taking scala case classes as input and are generating mutable.ListBuffer of scala case classes. Therefore, I am guessing hashCode and equals should be taken care of. Thanks, Aniket On Wed, Apr 15, 2015 at 1:00 PM Xianjin YE wrote: > what is your JVM heap size settings? The OOM in SIzeEstimator is caused > by a lot of entry in IdentifyHashMap. > A quick guess is that the object in your dataset is a custom class and you > didn't implement the hashCode and equals method correctly. > > > > On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote: > > > I am aggregating a dataset using combineByKey method and for a certain > input size, the job fails with the following error. I have enabled head > dumps to better analyze the issue and will report back if I have any > findings. Meanwhile, if you guys have any idea of what could possibly > result in this error or how to better debug this, please let me know. > > > > java.lang.OutOfMemoryError: Java heap space > > at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) > > at java.util.IdentityHashMap.put(IdentityHashMap.java:445) > > at > org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132) > > at > org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178) > > at > org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177) > > at scala.collection.immutable.List.foreach(List.scala:381) > > at > org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177) > > at > org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) > > at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) > > at > org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) > > at > org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) > > at > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33) > > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) > > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105) > > at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93) > > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) > > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) > > > >
Re: OOM in SizeEstimator while using combineByKey
what is your JVM heap size settings? The OOM in SIzeEstimator is caused by a lot of entry in IdentifyHashMap. A quick guess is that the object in your dataset is a custom class and you didn't implement the hashCode and equals method correctly. On Wednesday, April 15, 2015 at 3:10 PM, Aniket Bhatnagar wrote: > I am aggregating a dataset using combineByKey method and for a certain input > size, the job fails with the following error. I have enabled head dumps to > better analyze the issue and will report back if I have any findings. > Meanwhile, if you guys have any idea of what could possibly result in this > error or how to better debug this, please let me know. > > java.lang.OutOfMemoryError: Java heap space > at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) > at java.util.IdentityHashMap.put(IdentityHashMap.java:445) > at > org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132) > at > org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178) > at > org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177) > at > org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) > at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) > at > org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) > at > org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) > at > org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105) > at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) > at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
OOM in SizeEstimator while using combineByKey
I am aggregating a dataset using combineByKey method and for a certain input size, the job fails with the following error. I have enabled head dumps to better analyze the issue and will report back if I have any findings. Meanwhile, if you guys have any idea of what could possibly result in this error or how to better debug this, please let me know. java.lang.OutOfMemoryError: Java heap space at java.util.IdentityHashMap.resize(IdentityHashMap.java:469) at java.util.IdentityHashMap.put(IdentityHashMap.java:445) at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:132) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:178) at org.apache.spark.util.SizeEstimator$$anonfun$visitSingleObject$1.apply(SizeEstimator.scala:177) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:177) at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:161) at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:155) at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78) at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:33) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insert(ExternalAppendOnlyMap.scala:105) at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:93) at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:44) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
CombineByKey - Please explain its working
I am reading about combinebyKey and going through below example from one of the blog post but i cant understand how it works step by step , Can some one please explain Case class Fruit ( kind : String , weight : Int ) { def makeJuice : Juice = Juice ( weight * 100 ) } Case class Juice ( volumn : Int ) { def add ( J : Juice ) : Juice = Juice ( volumn + J . volumn ) } Val apple1 = Fruit ( "Apple" , 5 ) Val Apple2 = Fruit ( "Apple" , 8 ) Val orange1 = Fruit ( "orange" , 10 ) Val Fruit = sc . Parallelize ( List (( "Apple" , apple1 ) , ( "orange" , orange1 ) , ( "Apple" , Apple2 ))) *Val Juice = Fruit . combineByKey ( f => f . makeJuice , ( J : Juice , f ) => J . add ( f . makeJuice ), ( J1 : Juice , J2 : Juice ) => J1 . add ( J2 ) )* -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/CombineByKey-Please-explain-its-working-tp22203.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: retry in combineByKey at BinaryClassificationMetrics.scala
Yes, my change is slightly downstream of this point in the processing though. The code is still creating a counter for each distinct score value, and then binning. I don't think that would cause a failure - just might be slow. At the extremes, you might see 'fetch failure' as a symptom of things running too slowly. Yes you can sacrifice some fidelity by more aggressively binning upstream, on your scores. That would drastically reduce the input size, at the cost of accuracy of course. On Tue, Dec 23, 2014 at 7:35 PM, Xiangrui Meng wrote: > Sean's PR may be relevant to this issue > (https://github.com/apache/spark/pull/3702). As a workaround, you can > try to truncate the raw scores to 4 digits (e.g., 0.5643215 -> 0.5643) > before sending it to BinaryClassificationMetrics. This may not work > well if he score distribution is very skewed. See discussion on > https://issues.apache.org/jira/browse/SPARK-4547 -Xiangrui > > On Tue, Dec 23, 2014 at 9:00 AM, Thomas Kwan wrote: >> Hi there, >> >> We are using mllib 1.1.1, and doing Logistics Regression with a dataset of >> about 150M rows. >> The training part usually goes pretty smoothly without any retries. But >> during the prediction stage and BinaryClassificationMetrics stage, I am >> seeing retries with error of "fetch failure". >> >> The prediction part is just as follows: >> >> val predictionAndLabel = testRDD.map { point => >> val prediction = model.predict(point.features) >> (prediction, point.label) >> } >> ... >> val metrics = new BinaryClassificationMetrics(predictionAndLabel) >> >> The fetch failure happened with the following stack trace: >> >> org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:515) >> >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3$lzycompute(BinaryClassificationMetrics.scala:101) >> >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3(BinaryClassificationMetrics.scala:96) >> >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:98) >> >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:98) >> >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:142) >> >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:50) >> >> org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:60) >> >> com.manage.ml.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:14) >> >> ... >> >> >> We are doing this in the yarn-client mode. 32 executors, 16G executor >> memory, and 12 cores as the spark-submit settings. >> >> I wonder if anyone has suggestion on how to debug this. >> >> thanks in advance >> thomas > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: retry in combineByKey at BinaryClassificationMetrics.scala
Sean's PR may be relevant to this issue (https://github.com/apache/spark/pull/3702). As a workaround, you can try to truncate the raw scores to 4 digits (e.g., 0.5643215 -> 0.5643) before sending it to BinaryClassificationMetrics. This may not work well if he score distribution is very skewed. See discussion on https://issues.apache.org/jira/browse/SPARK-4547 -Xiangrui On Tue, Dec 23, 2014 at 9:00 AM, Thomas Kwan wrote: > Hi there, > > We are using mllib 1.1.1, and doing Logistics Regression with a dataset of > about 150M rows. > The training part usually goes pretty smoothly without any retries. But > during the prediction stage and BinaryClassificationMetrics stage, I am > seeing retries with error of "fetch failure". > > The prediction part is just as follows: > > val predictionAndLabel = testRDD.map { point => > val prediction = model.predict(point.features) > (prediction, point.label) > } > ... > val metrics = new BinaryClassificationMetrics(predictionAndLabel) > > The fetch failure happened with the following stack trace: > > org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:515) > > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3$lzycompute(BinaryClassificationMetrics.scala:101) > > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3(BinaryClassificationMetrics.scala:96) > > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:98) > > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:98) > > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:142) > > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:50) > > org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:60) > > com.manage.ml.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:14) > > ... > > > We are doing this in the yarn-client mode. 32 executors, 16G executor > memory, and 12 cores as the spark-submit settings. > > I wonder if anyone has suggestion on how to debug this. > > thanks in advance > thomas - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
retry in combineByKey at BinaryClassificationMetrics.scala
Hi there, We are using mllib 1.1.1, and doing Logistics Regression with a dataset of about 150M rows. The training part usually goes pretty smoothly without any retries. But during the prediction stage and BinaryClassificationMetrics stage, I am seeing retries with error of "fetch failure". The prediction part is just as follows: val predictionAndLabel = testRDD.map { point => val prediction = model.predict(point.features) (prediction, point.label) } ... val metrics = new BinaryClassificationMetrics(predictionAndLabel) The fetch failure happened with the following stack trace: org.apache.spark.rdd.PairRDDFunctions.combineByKey(PairRDDFunctions.scala:515) org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3$lzycompute(BinaryClassificationMetrics.scala:101) org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.x$3(BinaryClassificationMetrics.scala:96) org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions$lzycompute(BinaryClassificationMetrics.scala:98) org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.confusions(BinaryClassificationMetrics.scala:98) org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.createCurve(BinaryClassificationMetrics.scala:142) org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.roc(BinaryClassificationMetrics.scala:50) org.apache.spark.mllib.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:60) com.manage.ml.evaluation.BinaryClassificationMetrics.areaUnderROC(BinaryClassificationMetrics.scala:14) ... We are doing this in the yarn-client mode. 32 executors, 16G executor memory, and 12 cores as the spark-submit settings. I wonder if anyone has suggestion on how to debug this. thanks in advance thomas
Re: Help with using combineByKey
Thank you guys! It was very helpful and now I understand it better. On Fri, Oct 10, 2014 at 1:38 AM, Davies Liu wrote: > Maybe this version is easier to use: > > plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) => > (x._1 + y._1, x._2 + y._2)) > > It has similar behavior with combineByKey(), will by faster than > groupByKey() version. > > On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA > wrote: > > Sean, > > > > Thank you. It works. But I am still confused about the function. Can you > > kindly throw some light on it? > > I was going through the example mentioned in > > > https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html > > > > Is there any better source through which I can learn more about these > > functions? It would be helpful if I can get a chance to look at more > > examples. > > Also, I assume using combineByKey helps us solve it parallel than using > > simple functions provided by scala as mentioned by Yana. Am I correct? > > > > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: > >> > >> Oh duh, sorry. The initialization should of course be (v) => (if (v > > >> 0) 1 else 0, 1) > >> This gives the answer you are looking for. I don't see what Part2 is > >> supposed to do differently. > >> > >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA > >> wrote: > >> > Hello Sean, > >> > > >> > Thank you, but changing from v to 1 doesn't help me either. > >> > > >> > I am trying to count the number of non-zero values using the first > >> > accumulator. > >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), > >> > ("SFO",0), > >> > ("SFO",9)) > >> > > >> > val plist = sc.parallelize(newlist) > >> > > >> > val part1 = plist.combineByKey( > >> >(v) => (1, 1), > >> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 > + > >> > 1), > >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, > acc1._2 + > >> > acc2._2) > >> >) > >> > > >> >val Part2 = part1.map{ case (key, value) => (key, > >> > (value._1,value._2)) } > >> > > >> > This should give me the result > >> > (LAX,(2,3)) > >> > (SFO,(1,3)) > >> > > >> > > >> > > >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen > wrote: > >> >> > >> >> You have a typo in your code at "var acc:", and the map from opPart1 > >> >> to opPart2 looks like a no-op, but those aren't the problem I think. > >> >> It sounds like you intend the first element of each pair to be a > count > >> >> of nonzero values, but you initialize the first element of the pair > to > >> >> v, not 1, in v => (v,1). Try v => (1,1) > >> >> > >> >> > >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > >> >> wrote: > >> >> > > >> >> > I am a beginner to Spark and finding it difficult to implement a > very > >> >> > simple > >> >> > reduce operation. I read that is ideal to use combineByKey for > >> >> > complex > >> >> > reduce operations. > >> >> > > >> >> > My input: > >> >> > > >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > >> >> > ("SFO",0), > >> >> > ("SFO",1), > >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > >> >> > ("KX",9), > >> >> > > >> >> > > >> >> > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > >> >> > > >> >> > > >> >> > val opPart1 = input.combineByKey( > >> >> >(v) => (v, 1), > >> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1
Re: Help with using combineByKey
Maybe this version is easier to use: plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) It has similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA wrote: > Sean, > > Thank you. It works. But I am still confused about the function. Can you > kindly throw some light on it? > I was going through the example mentioned in > https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html > > Is there any better source through which I can learn more about these > functions? It would be helpful if I can get a chance to look at more > examples. > Also, I assume using combineByKey helps us solve it parallel than using > simple functions provided by scala as mentioned by Yana. Am I correct? > > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: >> >> Oh duh, sorry. The initialization should of course be (v) => (if (v > >> 0) 1 else 0, 1) >> This gives the answer you are looking for. I don't see what Part2 is >> supposed to do differently. >> >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA >> wrote: >> > Hello Sean, >> > >> > Thank you, but changing from v to 1 doesn't help me either. >> > >> > I am trying to count the number of non-zero values using the first >> > accumulator. >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), >> > ("SFO",0), >> > ("SFO",9)) >> > >> > val plist = sc.parallelize(newlist) >> > >> > val part1 = plist.combineByKey( >> >(v) => (1, 1), >> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + >> > 1), >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> > acc2._2) >> >) >> > >> >val Part2 = part1.map{ case (key, value) => (key, >> > (value._1,value._2)) } >> > >> > This should give me the result >> > (LAX,(2,3)) >> > (SFO,(1,3)) >> > >> > >> > >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: >> >> >> >> You have a typo in your code at "var acc:", and the map from opPart1 >> >> to opPart2 looks like a no-op, but those aren't the problem I think. >> >> It sounds like you intend the first element of each pair to be a count >> >> of nonzero values, but you initialize the first element of the pair to >> >> v, not 1, in v => (v,1). Try v => (1,1) >> >> >> >> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA >> >> wrote: >> >> > >> >> > I am a beginner to Spark and finding it difficult to implement a very >> >> > simple >> >> > reduce operation. I read that is ideal to use combineByKey for >> >> > complex >> >> > reduce operations. >> >> > >> >> > My input: >> >> > >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> >> > ("SFO",0), >> >> > ("SFO",1), >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), >> >> > ("KX",9), >> >> > >> >> > >> >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> >> > >> >> > >> >> > val opPart1 = input.combineByKey( >> >> >(v) => (v, 1), >> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, >> >> > acc._2 + >> >> > 1), >> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, >> >> > acc1._2 + >> >> > acc2._2) >> >> >) >> >> > >> >> >val opPart2 = opPart1.map{ case (key, value) => (key, >> >> > (value._1,value._2)) } >> >> > >> >> > opPart2.collectAsMap().map(println(_)) >> >> > >> >> > If the value is greater than 0, the first accumulator should be >> >> > incremented >> >> > by 1, else it remains the sa
Re: Help with using combineByKey
It's the exact same reason you wrote: (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), right? the first function establishes an initial value for a count. The value is either (0,1) or (1,1) depending on whether the value is 0 or not. You're otherwise using the method just fine. You can write this function a lot of ways; this is a bit verbose but probably efficient. Yana's version is distributed. It's just that it uses simple Scala functions within map(). This also works although the groupByKey() can be a problem as it requires putting all values for a key in memory, whereas your combineByKey does not. On Fri, Oct 10, 2014 at 5:28 AM, HARIPRIYA AYYALASOMAYAJULA wrote: > Sean, > > Thank you. It works. But I am still confused about the function. Can you > kindly throw some light on it? > I was going through the example mentioned in > https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html > > Is there any better source through which I can learn more about these > functions? It would be helpful if I can get a chance to look at more > examples. > Also, I assume using combineByKey helps us solve it parallel than using > simple functions provided by scala as mentioned by Yana. Am I correct? > > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: >> >> Oh duh, sorry. The initialization should of course be (v) => (if (v > >> 0) 1 else 0, 1) >> This gives the answer you are looking for. I don't see what Part2 is >> supposed to do differently. >> >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA >> wrote: >> > Hello Sean, >> > >> > Thank you, but changing from v to 1 doesn't help me either. >> > >> > I am trying to count the number of non-zero values using the first >> > accumulator. >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), >> > ("SFO",0), >> > ("SFO",9)) >> > >> > val plist = sc.parallelize(newlist) >> > >> > val part1 = plist.combineByKey( >> >(v) => (1, 1), >> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + >> > 1), >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> > acc2._2) >> >) >> > >> >val Part2 = part1.map{ case (key, value) => (key, >> > (value._1,value._2)) } >> > >> > This should give me the result >> > (LAX,(2,3)) >> > (SFO,(1,3)) >> > >> > >> > >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: >> >> >> >> You have a typo in your code at "var acc:", and the map from opPart1 >> >> to opPart2 looks like a no-op, but those aren't the problem I think. >> >> It sounds like you intend the first element of each pair to be a count >> >> of nonzero values, but you initialize the first element of the pair to >> >> v, not 1, in v => (v,1). Try v => (1,1) >> >> >> >> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA >> >> wrote: >> >> > >> >> > I am a beginner to Spark and finding it difficult to implement a very >> >> > simple >> >> > reduce operation. I read that is ideal to use combineByKey for >> >> > complex >> >> > reduce operations. >> >> > >> >> > My input: >> >> > >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> >> > ("SFO",0), >> >> > ("SFO",1), >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), >> >> > ("KX",9), >> >> > >> >> > >> >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> >> > >> >> > >> >> > val opPart1 = input.combineByKey( >> >> >(v) => (v, 1), >> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, >> >> > acc._2 + >> >> > 1), >> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, >> >> > acc1._2 + >> >> > acc2._2) >> >> >) >
Re: Help with using combineByKey
Sean, Thank you. It works. But I am still confused about the function. Can you kindly throw some light on it? I was going through the example mentioned in https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Is there any better source through which I can learn more about these functions? It would be helpful if I can get a chance to look at more examples. Also, I assume using combineByKey helps us solve it parallel than using simple functions provided by scala as mentioned by Yana. Am I correct? On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: > Oh duh, sorry. The initialization should of course be (v) => (if (v > > 0) 1 else 0, 1) > This gives the answer you are looking for. I don't see what Part2 is > supposed to do differently. > > On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA > wrote: > > Hello Sean, > > > > Thank you, but changing from v to 1 doesn't help me either. > > > > I am trying to count the number of non-zero values using the first > > accumulator. > > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), > ("SFO",0), > > ("SFO",9)) > > > > val plist = sc.parallelize(newlist) > > > > val part1 = plist.combineByKey( > >(v) => (1, 1), > >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + > 1), > >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > > acc2._2) > >) > > > >val Part2 = part1.map{ case (key, value) => (key, > (value._1,value._2)) } > > > > This should give me the result > > (LAX,(2,3)) > > (SFO,(1,3)) > > > > > > > > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: > >> > >> You have a typo in your code at "var acc:", and the map from opPart1 > >> to opPart2 looks like a no-op, but those aren't the problem I think. > >> It sounds like you intend the first element of each pair to be a count > >> of nonzero values, but you initialize the first element of the pair to > >> v, not 1, in v => (v,1). Try v => (1,1) > >> > >> > >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > >> wrote: > >> > > >> > I am a beginner to Spark and finding it difficult to implement a very > >> > simple > >> > reduce operation. I read that is ideal to use combineByKey for complex > >> > reduce operations. > >> > > >> > My input: > >> > > >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > >> > ("SFO",0), > >> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > >> > ("KX",9), > >> > > >> > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > >> > > >> > > >> > val opPart1 = input.combineByKey( > >> >(v) => (v, 1), > >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, > >> > acc._2 + > >> > 1), > >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, > acc1._2 + > >> > acc2._2) > >> >) > >> > > >> >val opPart2 = opPart1.map{ case (key, value) => (key, > >> > (value._1,value._2)) } > >> > > >> > opPart2.collectAsMap().map(println(_)) > >> > > >> > If the value is greater than 0, the first accumulator should be > >> > incremented > >> > by 1, else it remains the same. The second accumulator is a simple > >> > counter > >> > for each value. I am getting an incorrect output (garbage values )for > >> > the > >> > first accumulator. Please help. > >> > > >> > The equivalent reduce operation in Hadoop MapReduce is : > >> > > >> > public static class PercentageCalcReducer extends > >> > Reducer > >> > > >> > { > >> > > >> > private FloatWritable pdelay = new FloatWritable(); > >> > > >> > > >> > public void reduce(Text key, Iterable values,Context > >> > context)throws IOException,InterruptedException > >> > > >> > { > >> > > >> > int acc2=0; > >> > > >> > float frac_delay, percentage_delay; > >> > > >> > int acc1=0; > >> > > >> > for(IntWritable val : values) > >> > > >> > { > >> > > >> > if(val.get() > 0) > >> > > >> > { > >> > > >> > acc1++; > >> > > >> > } > >> > > >> > acc2++; > >> > > >> > } > >> > > >> > > >> > > >> > frac_delay = (float)acc1/acc2; > >> > > >> > percentage_delay = frac_delay * 100 ; > >> > > >> > pdelay.set(percentage_delay); > >> > > >> > context.write(key,pdelay); > >> > > >> > } > >> > > >> > } > >> > > >> > > >> > Please help. Thank you for your time. > >> > > >> > -- > >> > > >> > Regards, > >> > > >> > Haripriya Ayyalasomayajula > >> > contact : 650-796-7112 > > > > > > > > > > -- > > Regards, > > Haripriya Ayyalasomayajula > > contact : 650-796-7112 > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: Help with using combineByKey
Oh duh, sorry. The initialization should of course be (v) => (if (v > 0) 1 else 0, 1) This gives the answer you are looking for. I don't see what Part2 is supposed to do differently. On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA wrote: > Hello Sean, > > Thank you, but changing from v to 1 doesn't help me either. > > I am trying to count the number of non-zero values using the first > accumulator. > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0), > ("SFO",9)) > > val plist = sc.parallelize(newlist) > > val part1 = plist.combineByKey( >(v) => (1, 1), >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) >) > >val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) } > > This should give me the result > (LAX,(2,3)) > (SFO,(1,3)) > > > > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: >> >> You have a typo in your code at "var acc:", and the map from opPart1 >> to opPart2 looks like a no-op, but those aren't the problem I think. >> It sounds like you intend the first element of each pair to be a count >> of nonzero values, but you initialize the first element of the pair to >> v, not 1, in v => (v,1). Try v => (1,1) >> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA >> wrote: >> > >> > I am a beginner to Spark and finding it difficult to implement a very >> > simple >> > reduce operation. I read that is ideal to use combineByKey for complex >> > reduce operations. >> > >> > My input: >> > >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> > ("SFO",0), >> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), >> > ("KX",9), >> > >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> > >> > >> > val opPart1 = input.combineByKey( >> >(v) => (v, 1), >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, >> > acc._2 + >> > 1), >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> > acc2._2) >> >) >> > >> >val opPart2 = opPart1.map{ case (key, value) => (key, >> > (value._1,value._2)) } >> > >> > opPart2.collectAsMap().map(println(_)) >> > >> > If the value is greater than 0, the first accumulator should be >> > incremented >> > by 1, else it remains the same. The second accumulator is a simple >> > counter >> > for each value. I am getting an incorrect output (garbage values )for >> > the >> > first accumulator. Please help. >> > >> > The equivalent reduce operation in Hadoop MapReduce is : >> > >> > public static class PercentageCalcReducer extends >> > Reducer >> > >> > { >> > >> > private FloatWritable pdelay = new FloatWritable(); >> > >> > >> > public void reduce(Text key, Iterable values,Context >> > context)throws IOException,InterruptedException >> > >> > { >> > >> > int acc2=0; >> > >> > float frac_delay, percentage_delay; >> > >> > int acc1=0; >> > >> > for(IntWritable val : values) >> > >> > { >> > >> > if(val.get() > 0) >> > >> > { >> > >> > acc1++; >> > >> > } >> > >> > acc2++; >> > >> > } >> > >> > >> > >> > frac_delay = (float)acc1/acc2; >> > >> > percentage_delay = frac_delay * 100 ; >> > >> > pdelay.set(percentage_delay); >> > >> > context.write(key,pdelay); >> > >> > } >> > >> > } >> > >> > >> > Please help. Thank you for your time. >> > >> > -- >> > >> > Regards, >> > >> > Haripriya Ayyalasomayajula >> > contact : 650-796-7112 > > > > > -- > Regards, > Haripriya Ayyalasomayajula > contact : 650-796-7112 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with using combineByKey
Hello Yana, Thank you. Yes, it works. However, can you please suggest any examples ( or links) about the usage of combineByKey. On Thu, Oct 9, 2014 at 12:03 PM, Yana Kadiyska wrote: > If you just want the ratio of positive to all values per key (if I'm > reading right) this works > > val reduced= input.groupByKey().map(grp=> > grp._2.filter(v=>v>0).size.toFloat/grp._2.size) > reduced.foreach(println) > > I don't think you need reduceByKey or combineByKey as you're not doing > anything where the values depend on each other -- you're just counting... > > On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA < > aharipriy...@gmail.com> wrote: > >> >> I am a beginner to Spark and finding it difficult to implement a very >> simple reduce operation. I read that is ideal to use combineByKey for >> complex reduce operations. >> >> My input: >> >> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> ("SFO",0), ("SFO",1), >> ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), >> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> >> >> val opPart1 = input.combineByKey( >>(v) => (v, 1), >>(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 >> + 1), >>(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> acc2._2) >>) >> >>val opPart2 = opPart1.map{ case (key, value) => (key, >> (value._1,value._2)) } >> >> opPart2.collectAsMap().map(println(_)) >> >> If the value is greater than 0, the first accumulator should be >> incremented by 1, else it remains the same. The second accumulator is a >> simple counter for each value. I am getting an incorrect output (garbage >> values )for the first accumulator. Please help. >> >> The equivalent reduce operation in Hadoop MapReduce is : >> >> >> public static class PercentageCalcReducer extends >> Reducer >> >> { >> >> private FloatWritable pdelay = new FloatWritable(); >> >> >> public void reduce(Text key, Iterable values,Context >> context)throws IOException,InterruptedException >> >> { >> >> int acc2=0; >> >> float frac_delay, percentage_delay; >> >> int acc1=0; >> >> for(IntWritable val : values) >> >> { >> >> if(val.get() > 0) >> >> { >> >> acc1++; >> >> } >> >> acc2++; >> >> } >> >> >> >> frac_delay = (float)acc1/acc2; >> >> percentage_delay = frac_delay * 100 ; >> >> pdelay.set(percentage_delay); >> >> context.write(key,pdelay); >> >> } >> >> } >> >> >> Please help. Thank you for your time. >> >> -- >> >> Regards, >> Haripriya Ayyalasomayajula >> contact : 650-796-7112 >> > > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: Help with using combineByKey
Hello Sean, Thank you, but changing from v to 1 doesn't help me either. I am trying to count the number of non-zero values using the first accumulator. val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0), ("SFO",9)) val plist = sc.parallelize(newlist) val part1 = plist.combineByKey( (v) => (1, 1), (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) } This should give me the result (LAX,(2,3)) (SFO,(1,3)) On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: > You have a typo in your code at "var acc:", and the map from opPart1 > to opPart2 looks like a no-op, but those aren't the problem I think. > It sounds like you intend the first element of each pair to be a count > of nonzero values, but you initialize the first element of the pair to > v, not 1, in v => (v,1). Try v => (1,1) > > > On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > wrote: > > > > I am a beginner to Spark and finding it difficult to implement a very > simple > > reduce operation. I read that is ideal to use combineByKey for complex > > reduce operations. > > > > My input: > > > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > ("SFO",0), > > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > > ("KX",9), > > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > > > > val opPart1 = input.combineByKey( > >(v) => (v, 1), > >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, > acc._2 + > > 1), > >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > > acc2._2) > >) > > > >val opPart2 = opPart1.map{ case (key, value) => (key, > > (value._1,value._2)) } > > > > opPart2.collectAsMap().map(println(_)) > > > > If the value is greater than 0, the first accumulator should be > incremented > > by 1, else it remains the same. The second accumulator is a simple > counter > > for each value. I am getting an incorrect output (garbage values )for > the > > first accumulator. Please help. > > > > The equivalent reduce operation in Hadoop MapReduce is : > > > > public static class PercentageCalcReducer extends > > Reducer > > > > { > > > > private FloatWritable pdelay = new FloatWritable(); > > > > > > public void reduce(Text key, Iterable values,Context > > context)throws IOException,InterruptedException > > > > { > > > > int acc2=0; > > > > float frac_delay, percentage_delay; > > > > int acc1=0; > > > > for(IntWritable val : values) > > > > { > > > > if(val.get() > 0) > > > > { > > > > acc1++; > > > > } > > > > acc2++; > > > > } > > > > > > > > frac_delay = (float)acc1/acc2; > > > > percentage_delay = frac_delay * 100 ; > > > > pdelay.set(percentage_delay); > > > > context.write(key,pdelay); > > > > } > > > > } > > > > > > Please help. Thank you for your time. > > > > -- > > > > Regards, > > > > Haripriya Ayyalasomayajula > > contact : 650-796-7112 > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: Help with using combineByKey
If you just want the ratio of positive to all values per key (if I'm reading right) this works val reduced= input.groupByKey().map(grp=> grp._2.filter(v=>v>0).size.toFloat/grp._2.size) reduced.foreach(println) I don't think you need reduceByKey or combineByKey as you're not doing anything where the values depend on each other -- you're just counting... On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA < aharipriy...@gmail.com> wrote: > > I am a beginner to Spark and finding it difficult to implement a very > simple reduce operation. I read that is ideal to use combineByKey for > complex reduce operations. > > My input: > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > ("SFO",0), ("SFO",1), > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > val opPart1 = input.combineByKey( >(v) => (v, 1), >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 > + 1), >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) >) > >val opPart2 = opPart1.map{ case (key, value) => (key, > (value._1,value._2)) } > > opPart2.collectAsMap().map(println(_)) > > If the value is greater than 0, the first accumulator should be > incremented by 1, else it remains the same. The second accumulator is a > simple counter for each value. I am getting an incorrect output (garbage > values )for the first accumulator. Please help. > > The equivalent reduce operation in Hadoop MapReduce is : > > > public static class PercentageCalcReducer extends > Reducer > > { > > private FloatWritable pdelay = new FloatWritable(); > > > public void reduce(Text key, Iterable values,Context > context)throws IOException,InterruptedException > > { > > int acc2=0; > > float frac_delay, percentage_delay; > > int acc1=0; > > for(IntWritable val : values) > > { > > if(val.get() > 0) > > { > > acc1++; > > } > > acc2++; > > } > > > > frac_delay = (float)acc1/acc2; > > percentage_delay = frac_delay * 100 ; > > pdelay.set(percentage_delay); > > context.write(key,pdelay); > > } > > } > > > Please help. Thank you for your time. > > -- > > Regards, > Haripriya Ayyalasomayajula > contact : 650-796-7112 >
Re: Help with using combineByKey
You have a typo in your code at "var acc:", and the map from opPart1 to opPart2 looks like a no-op, but those aren't the problem I think. It sounds like you intend the first element of each pair to be a count of nonzero values, but you initialize the first element of the pair to v, not 1, in v => (v,1). Try v => (1,1) On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA wrote: > > I am a beginner to Spark and finding it difficult to implement a very simple > reduce operation. I read that is ideal to use combineByKey for complex > reduce operations. > > My input: > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0), > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > ("KX",9), > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > val opPart1 = input.combineByKey( >(v) => (v, 1), >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + > 1), >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) >) > >val opPart2 = opPart1.map{ case (key, value) => (key, > (value._1,value._2)) } > > opPart2.collectAsMap().map(println(_)) > > If the value is greater than 0, the first accumulator should be incremented > by 1, else it remains the same. The second accumulator is a simple counter > for each value. I am getting an incorrect output (garbage values )for the > first accumulator. Please help. > > The equivalent reduce operation in Hadoop MapReduce is : > > public static class PercentageCalcReducer extends > Reducer > > { > > private FloatWritable pdelay = new FloatWritable(); > > > public void reduce(Text key, Iterable values,Context > context)throws IOException,InterruptedException > > { > > int acc2=0; > > float frac_delay, percentage_delay; > > int acc1=0; > > for(IntWritable val : values) > > { > > if(val.get() > 0) > > { > > acc1++; > > } > > acc2++; > > } > > > > frac_delay = (float)acc1/acc2; > > percentage_delay = frac_delay * 100 ; > > pdelay.set(percentage_delay); > > context.write(key,pdelay); > > } > > } > > > Please help. Thank you for your time. > > -- > > Regards, > > Haripriya Ayyalasomayajula > contact : 650-796-7112 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Help with using combineByKey
I am a beginner to Spark and finding it difficult to implement a very simple reduce operation. I read that is ideal to use combineByKey for complex reduce operations. My input: val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0), ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) val opPart1 = input.combineByKey( (v) => (v, 1), (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val opPart2 = opPart1.map{ case (key, value) => (key, (value._1,value._2)) } opPart2.collectAsMap().map(println(_)) If the value is greater than 0, the first accumulator should be incremented by 1, else it remains the same. The second accumulator is a simple counter for each value. I am getting an incorrect output (garbage values )for the first accumulator. Please help. The equivalent reduce operation in Hadoop MapReduce is : public static class PercentageCalcReducer extends Reducer { private FloatWritable pdelay = new FloatWritable(); public void reduce(Text key, Iterable values,Context context)throws IOException,InterruptedException { int acc2=0; float frac_delay, percentage_delay; int acc1=0; for(IntWritable val : values) { if(val.get() > 0) { acc1++; } acc2++; } frac_delay = (float)acc1/acc2; percentage_delay = frac_delay * 100 ; pdelay.set(percentage_delay); context.write(key,pdelay); } } Please help. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: aggregateByKey vs combineByKey
Thanks Liquan, that was really helpful. On Mon, Sep 29, 2014 at 5:54 PM, Liquan Pei wrote: > Hi Dave, > > You can replace groupByKey with reduceByKey to improve performance in some > cases. reduceByKey performs map side combine which can reduce Network IO > and shuffle size where as groupByKey will not perform map side combine. > > combineByKey is more general then aggregateByKey. Actually, the > implementation of aggregateByKey, reduceByKey and groupByKey is achieved by > combineByKey. aggregateByKey is similar to reduceByKey but you can provide > initial values when performing aggregation. > > As the name suggests, aggregateByKey is suitable for compute aggregations > for keys, example aggregations such as sum, avg, etc. The rule here is that > the extra computation spent for map side combine can reduce the size sent > out to other nodes and driver. If your func has satisfies this rule, you > probably should use aggregateByKey. > > combineByKey is more general and you have the flexibility to specify > whether you'd like to perform map side combine. However, it is more complex > to use. At minimum, you need to implement three functions: createCombiner, > mergeValue, mergeCombiners. > > Hope this helps! > Liquan > > On Sun, Sep 28, 2014 at 11:59 PM, David Rowe wrote: > >> Hi All, >> >> After some hair pulling, I've reached the realisation that an operation I >> am currently doing via: >> >> myRDD.groupByKey.mapValues(func) >> >> should be done more efficiently using aggregateByKey or combineByKey. >> Both of these methods would do, and they seem very similar to me in terms >> of their function. >> >> My question is, what are the differences between these two methods (other >> than the slight differences in their type signatures)? Under what >> circumstances should I use one or the other? >> >> Thanks >> >> Dave >> >> >> > > > -- > Liquan Pei > Department of Physics > University of Massachusetts Amherst >
Re: aggregateByKey vs combineByKey
Hi Dave, You can replace groupByKey with reduceByKey to improve performance in some cases. reduceByKey performs map side combine which can reduce Network IO and shuffle size where as groupByKey will not perform map side combine. combineByKey is more general then aggregateByKey. Actually, the implementation of aggregateByKey, reduceByKey and groupByKey is achieved by combineByKey. aggregateByKey is similar to reduceByKey but you can provide initial values when performing aggregation. As the name suggests, aggregateByKey is suitable for compute aggregations for keys, example aggregations such as sum, avg, etc. The rule here is that the extra computation spent for map side combine can reduce the size sent out to other nodes and driver. If your func has satisfies this rule, you probably should use aggregateByKey. combineByKey is more general and you have the flexibility to specify whether you'd like to perform map side combine. However, it is more complex to use. At minimum, you need to implement three functions: createCombiner, mergeValue, mergeCombiners. Hope this helps! Liquan On Sun, Sep 28, 2014 at 11:59 PM, David Rowe wrote: > Hi All, > > After some hair pulling, I've reached the realisation that an operation I > am currently doing via: > > myRDD.groupByKey.mapValues(func) > > should be done more efficiently using aggregateByKey or combineByKey. Both > of these methods would do, and they seem very similar to me in terms of > their function. > > My question is, what are the differences between these two methods (other > than the slight differences in their type signatures)? Under what > circumstances should I use one or the other? > > Thanks > > Dave > > > -- Liquan Pei Department of Physics University of Massachusetts Amherst
aggregateByKey vs combineByKey
Hi All, After some hair pulling, I've reached the realisation that an operation I am currently doing via: myRDD.groupByKey.mapValues(func) should be done more efficiently using aggregateByKey or combineByKey. Both of these methods would do, and they seem very similar to me in terms of their function. My question is, what are the differences between these two methods (other than the slight differences in their type signatures)? Under what circumstances should I use one or the other? Thanks Dave
Re: combineByKey throws ClassCastException
This problem was caused by the fact that I used a package jar with a Spark version (0.9.1) different from that of the cluster (0.9.0). When I used the correct package jar (spark-assembly_2.10-0.9.0-cdh5.0.1-hadoop2.3.0-cdh5.0.1.jar) instead the application can run as expected. 2014-09-15 14:57 GMT+08:00 x : > How about this. > > scala> val rdd2 = rdd.combineByKey( > | (v: Int) => v.toLong, > | (c: Long, v: Int) => c + v, > | (c1: Long, c2: Long) => c1 + c2) > rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at > combineB > yKey at :14 > > xj @ Tokyo > > On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao > wrote: > >> I followd an example presented in the tutorial Learning Spark >> <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html> >> to compute the per-key average as follows: >> >> >> val Array(appName) = args >> val sparkConf = new SparkConf() >> .setAppName(appName) >> val sc = new SparkContext(sparkConf) >> /* >> * compute the per-key average of values >> * results should be: >> *A : 5.8 >> *B : 14 >> *C : 60.6 >> */ >> val rdd = sc.parallelize(List( >> ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5), >> ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25), >> ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2) >> val avg = rdd.combineByKey( >> (x:Int) => (x, 1), // java.lang.ClassCastException: scala.Tuple2$mcII$sp >> cannot be cast to java.lang.Integer >> (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1), >> (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> acc2._2)) >> .map{case (s, t) => (s, t._1/t._2.toFloat)} >> avg.collect.foreach(t => println(t._1 + " ->" + t._2)) >> >> >> >> When I submitted the application, an exception of >> "*java.lang.ClassCastException: >> scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown >> out. The tutorial said that the first function of *combineByKey*, *(x:Int) >> => (x, 1)*, should take a single element in the source RDD and return an >> element of the desired type in the resulting RDD. In my application, we >> take a single element of type *Int *from the source RDD and return a >> tuple of type (*Int*, *Int*), which meets the requirements quite well. >> But why would such an exception be thrown? >> >> I'm using CDH 5.0 and Spark 0.9 >> >> Thanks. >> >> >> >
Re: combineByKey throws ClassCastException
How about this. scala> val rdd2 = rdd.combineByKey( | (v: Int) => v.toLong, | (c: Long, v: Int) => c + v, | (c1: Long, c2: Long) => c1 + c2) rdd2: org.apache.spark.rdd.RDD[(String, Long)] = MapPartitionsRDD[9] at combineB yKey at :14 xj @ Tokyo On Mon, Sep 15, 2014 at 3:06 PM, Tao Xiao wrote: > I followd an example presented in the tutorial Learning Spark > <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html> > to compute the per-key average as follows: > > > val Array(appName) = args > val sparkConf = new SparkConf() > .setAppName(appName) > val sc = new SparkContext(sparkConf) > /* > * compute the per-key average of values > * results should be: > *A : 5.8 > *B : 14 > *C : 60.6 > */ > val rdd = sc.parallelize(List( > ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5), > ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25), > ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2) > val avg = rdd.combineByKey( > (x:Int) => (x, 1), // java.lang.ClassCastException: scala.Tuple2$mcII$sp > cannot be cast to java.lang.Integer > (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1), > (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2)) > .map{case (s, t) => (s, t._1/t._2.toFloat)} > avg.collect.foreach(t => println(t._1 + " ->" + t._2)) > > > > When I submitted the application, an exception of > "*java.lang.ClassCastException: > scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown > out. The tutorial said that the first function of *combineByKey*, *(x:Int) > => (x, 1)*, should take a single element in the source RDD and return an > element of the desired type in the resulting RDD. In my application, we > take a single element of type *Int *from the source RDD and return a > tuple of type (*Int*, *Int*), which meets the requirements quite well. > But why would such an exception be thrown? > > I'm using CDH 5.0 and Spark 0.9 > > Thanks. > > >
combineByKey throws ClassCastException
I followd an example presented in the tutorial Learning Spark <http://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html> to compute the per-key average as follows: val Array(appName) = args val sparkConf = new SparkConf() .setAppName(appName) val sc = new SparkContext(sparkConf) /* * compute the per-key average of values * results should be: *A : 5.8 *B : 14 *C : 60.6 */ val rdd = sc.parallelize(List( ("A", 3), ("A", 9), ("A", 12), ("A", 0), ("A", 5), ("B", 4), ("B", 10), ("B", 11), ("B", 20), ("B", 25), ("C", 32), ("C", 91), ("C", 122), ("C", 3), ("C", 55)), 2) val avg = rdd.combineByKey( (x:Int) => (x, 1), // java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer (acc:(Int, Int), x) => (acc._1 + x, acc._2 + 1), (acc1:(Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) .map{case (s, t) => (s, t._1/t._2.toFloat)} avg.collect.foreach(t => println(t._1 + " ->" + t._2)) When I submitted the application, an exception of "*java.lang.ClassCastException: scala.Tuple2$mcII$sp cannot be cast to java.lang.Integer*" was thrown out. The tutorial said that the first function of *combineByKey*, *(x:Int) => (x, 1)*, should take a single element in the source RDD and return an element of the desired type in the resulting RDD. In my application, we take a single element of type *Int *from the source RDD and return a tuple of type (*Int*, *Int*), which meets the requirements quite well. But why would such an exception be thrown? I'm using CDH 5.0 and Spark 0.9 Thanks.
Re: combineByKey at ShuffledDStream.scala
The streaming program contains the following main stages: 1. receive data from Kafka 2. preprocessing of the data. These are all map and filtering stages. 3. Group by a field 4. Process the groupBy results using map. Inside this processing, I use collect, count. Thanks! Bill On Tue, Jul 22, 2014 at 10:05 PM, Tathagata Das wrote: > Can you give an idea of the streaming program? Rest of the transformation > you are doing on the input streams? > > > On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay > wrote: > >> Hi all, >> >> I am currently running a Spark Streaming program, which consumes data >> from Kakfa and does the group by operation on the data. I try to optimize >> the running time of the program because it looks slow to me. It seems the >> stage named: >> >> * combineByKey at ShuffledDStream.scala:42 * >> >> always takes the longest running time. And If I open this stage, I only >> see two executors on this stage. Does anyone has an idea what this stage >> does and how to increase the speed for this stage? Thanks! >> >> Bill >> > >
Re: combineByKey at ShuffledDStream.scala
Can you give an idea of the streaming program? Rest of the transformation you are doing on the input streams? On Tue, Jul 22, 2014 at 11:05 AM, Bill Jay wrote: > Hi all, > > I am currently running a Spark Streaming program, which consumes data from > Kakfa and does the group by operation on the data. I try to optimize the > running time of the program because it looks slow to me. It seems the stage > named: > > * combineByKey at ShuffledDStream.scala:42 * > > always takes the longest running time. And If I open this stage, I only > see two executors on this stage. Does anyone has an idea what this stage > does and how to increase the speed for this stage? Thanks! > > Bill >
combineByKey at ShuffledDStream.scala
Hi all, I am currently running a Spark Streaming program, which consumes data from Kakfa and does the group by operation on the data. I try to optimize the running time of the program because it looks slow to me. It seems the stage named: * combineByKey at ShuffledDStream.scala:42 * always takes the longest running time. And If I open this stage, I only see two executors on this stage. Does anyone has an idea what this stage does and how to increase the speed for this stage? Thanks! Bill
Re: When to use CombineByKey vs reduceByKey?
Matei, Thanks for the answer this clarifies this very much. Based on my usage I would use combineByKey, since the output is another custom data structures. I found out my issues with combineByKey were relieved after doing more tuning with the level of parallelism. I've found that it really depends on the size of my dataset, since I did tests for 1000, 10K, 100K, 1M data points, for now the GC issue is under control once I modified my data structures to be mutable and the key part I was missing was that all classes within it need it to be serializable Thanks! - Diana On Wed, Jun 11, 2014 at 6:06 PM, Matei Zaharia wrote: > combineByKey is designed for when your return type from the aggregation is > different from the values being aggregated (e.g. you group together > objects), and it should allow you to modify the leftmost argument of each > function (mergeCombiners, mergeValue, etc) and return that instead of > allocating a new object. So it should work with mutable objects — please > post what problems you had with that. reduceByKey actually also allows this > if your types are the same. > > Matei > > > On Jun 11, 2014, at 3:21 PM, Diana Hu wrote: > > Hello all, > > I've seen some performance improvements using combineByKey as opposed to > reduceByKey or a groupByKey+map function. I have a couple questions. it'd > be great if any one can provide some light into this. > > 1) When should I use combineByKey vs reduceByKey? > > 2) Do the containers need to be immutable for combineByKey? I've created > custom data structures for the containers, one mutable and one immutable. > The tests with the mutable containers, spark crashed with an error on > missing references. However the downside of immutable containers (which > works on my tests), is that for large datasets the garbage collector gets > called many more times, and it tends to run out of heap space as the GC > can't catch up. I tried some of the tips here > http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning > the JVM params, but this seems to be too much tuning? > > Thanks in advance, > - Diana > > >
Re: When to use CombineByKey vs reduceByKey?
combineByKey is designed for when your return type from the aggregation is different from the values being aggregated (e.g. you group together objects), and it should allow you to modify the leftmost argument of each function (mergeCombiners, mergeValue, etc) and return that instead of allocating a new object. So it should work with mutable objects — please post what problems you had with that. reduceByKey actually also allows this if your types are the same. Matei On Jun 11, 2014, at 3:21 PM, Diana Hu wrote: > Hello all, > > I've seen some performance improvements using combineByKey as opposed to > reduceByKey or a groupByKey+map function. I have a couple questions. it'd be > great if any one can provide some light into this. > > 1) When should I use combineByKey vs reduceByKey? > > 2) Do the containers need to be immutable for combineByKey? I've created > custom data structures for the containers, one mutable and one immutable. The > tests with the mutable containers, spark crashed with an error on missing > references. However the downside of immutable containers (which works on my > tests), is that for large datasets the garbage collector gets called many > more times, and it tends to run out of heap space as the GC can't catch up. I > tried some of the tips here > http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning the > JVM params, but this seems to be too much tuning? > > Thanks in advance, > - Diana
When to use CombineByKey vs reduceByKey?
Hello all, I've seen some performance improvements using combineByKey as opposed to reduceByKey or a groupByKey+map function. I have a couple questions. it'd be great if any one can provide some light into this. 1) When should I use combineByKey vs reduceByKey? 2) Do the containers need to be immutable for combineByKey? I've created custom data structures for the containers, one mutable and one immutable. The tests with the mutable containers, spark crashed with an error on missing references. However the downside of immutable containers (which works on my tests), is that for large datasets the garbage collector gets called many more times, and it tends to run out of heap space as the GC can't catch up. I tried some of the tips here http://spark.apache.org/docs/latest/tuning.html#memory-tuning and tuning the JVM params, but this seems to be too much tuning? Thanks in advance, - Diana
Re: combinebykey throw classcastexception
This issue is turned out cased by version mismatch between driver(0.9.1) and server(0.9.0-cdh5.0.1) just now. Other function works fine but combinebykey before. Thank you very much for your reply. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6060p6087.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: combinebykey throw classcastexception
You asked off-list, and provided a more detailed example there: val random = new Random() val testdata = (1 to 1).map(_=>(random.nextInt(),random.nextInt())) sc.parallelize(testdata).combineByKey[ArrayBuffer[Int]]( (instant:Int)=>{new ArrayBuffer[Int]()}, (bucket:ArrayBuffer[Int],instant:Int)=>{bucket+=instant}, (bucket1:ArrayBuffer[Int],bucket2:ArrayBuffer[Int])=>{bucket1++=bucket2} ).collect() https://www.quora.com/Why-is-my-combinebykey-throw-classcastexception I can't reproduce this with Spark 0.9.0 / CDH5 or Spark 1.0.0 RC9. Your definition looks fine too. (Except that you are dropping the first value, but that's a different problem.) On Tue, May 20, 2014 at 2:05 AM, xiemeilong wrote: > I am using CDH5 on a three machines cluster. map data from hbase as (string, > V) pair , then call combineByKey like this: > > .combineByKey[C]( > (v:V)=>new C(v), //this line throw java.lang.ClassCastException: C > cannot be cast to V > (v:C,v:V)=>C, > (c1:C,c2:C)=>C) > > > I am very confused of this, there isn't C to V casting at all. What's > wrong? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6059.html > Sent from the Apache Spark User List mailing list archive at Nabble.com.
combinebykey throw classcastexception
I am using CDH5 on a three machines cluster. map data from hbase as (string, V) pair , then call combineByKey like this: .combineByKey[C]( (v:V)=>new C(v), //this line throw java.lang.ClassCastException: C cannot be cast to V (v:C,v:V)=>C, (c1:C,c2:C)=>C) I am very confused of this, there isn't C to V casting at all. What's wrong? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/combinebykey-throw-classcastexception-tp6059.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Use combineByKey and StatCount
Not very sure about the meaning of “mean of RDD by key”, is this what you want? val meansByKey = rdd .map { case (k, v) => k -> (v, 1) } .reduceByKey { (lhs, rhs) => (lhs._1 + rhs._1, lhs._2 + rhs._2) } .map { case (sum, count) => sum / count } .collectAsMap() With this, you need to be careful about overflow though. On Tue, Apr 1, 2014 at 10:55 PM, Jaonary Rabarisoa wrote: > Hi all; > > Can someone give me some tips to compute mean of RDD by key , maybe with > combineByKey and StatCount. > > Cheers, > > Jaonary >
Re: Use combineByKey and StatCount
it seems you can imitate RDD.top()'s implementation. for each partition, you get the number of records, and the total sum of key, and in the final result handler, you add all the sum together, and add the number of records together, then you can get the mean, I mean, arithmetic mean. On Tue, Apr 1, 2014 at 10:55 AM, Jaonary Rabarisoa wrote: > Hi all; > > Can someone give me some tips to compute mean of RDD by key , maybe with > combineByKey and StatCount. > > Cheers, > > Jaonary > -- Dachuan Huang Cellphone: 614-390-7234 2015 Neil Avenue Ohio State University Columbus, Ohio U.S.A. 43210
Use combineByKey and StatCount
Hi all; Can someone give me some tips to compute mean of RDD by key , maybe with combineByKey and StatCount. Cheers, Jaonary