If the number of items is very large, have you considered using
probabilistic counting? The HyperLogLogPlus
<https://github.com/addthis/stream-lib/blob/master/src/main/java/com/clearspring/analytics/stream/cardinality/HyperLogLogPlus.java>
class from stream-lib <https://github.com/addthis/stream-lib> might be
suitable.

On Tue, Jun 30, 2015 at 2:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) <deepuj...@gmail.com> wrote:

> I have a RDD of type (String,
>  
> Iterable[(com.ebay.ep.poc.spark.reporting.process.detail.model.DetailInputRecord,
> com.ebay.ep.poc.spark.reporting.process.model.DataRecord)])]
>
> Here String is Key and a list of tuples for that key. I got above RDD
> after doing a groupByKey. I later want to compute total number of values
> for a given key and total number of unique values for the same given key
> and hence i do this
>
>     val totalViCount = details.size.toLong
>     val uniqueViCount =
> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
>
> How do i do this using reduceByKey.
>
> *Total Code:*
>
>       val groupedDetail: RDD[(String, Iterable[(DetailInputRecord,
> DataRecord)])] = detailInputsToGroup.map {
>         case (detailInput, dataRecord) =>
>           val key: StringBuilder = new StringBuilder
>           dimensions.foreach {
>             dimension =>
>               key ++= {
>
> Option(dataRecord.get(dimension)).getOrElse(Option(detailInput.get(dimension)).getOrElse("")).toString
>               }
>           }
>           (key.toString, (detailInput, dataRecord))
>       }.groupByKey
>
>       groupedDetail.map {
>         case (key, values) => {
>           val valueList = values.toList
>
>           //Compute dimensions // You can skup this
>           val (detailInput, dataRecord) = valueList.head
>           val schema = SchemaUtil.outputSchema(_detail)
>           val detailOutput = new DetailOutputRecord(detail, new
> SessionRecord(schema))
>           DataUtil.populateDimensions(schema, dimensions.toArray,
> detailInput, dataRecord, detailOutput)
>
>
>           val metricsData = metricProviders.flatMap {
>             case (className, instance) =>
>               val data = instance.getMetrics(valueList)
>               ReflectionUtil.getData(data,
> _metricProviderMemberNames(className))
>           }
>           metricsData.map { case (k, v) => detailOutput.put(k, v) }
>           val wrap = new AvroKey[DetailOutputRecord](detailOutput)
>           (wrap, NullWritable.get)
>         }
>       }
>
>
> //getMetrics:
>   def getMetrics(details: List[(DetailInputRecord, DataRecord)]) = {
>     val totalViCount = details.size.toLong
>     val uniqueViCount =
> details.map(_._1.get("itemId").asInstanceOf[Long]).distinct.size.toLong
>     new ViewItemCountMetric(totalViCount, uniqueViCount)
>   }
>
>
> I understand that totalViCount can be implemented using reduceByKey. How
> can i implement total unique count as i need to have the full list to know
> the unique values.
>
> --
> Deepak
>
>

Reply via email to