Hello Feynman, Actually in my case, the vectors I am summarizing over will not have the same dimension since many devices will be inactive on some days. This is at best a sparse matrix where we take only the active days and attempt to fit a moving average over it.
The reason I would like to save it to HDFS is that there are really several million (almost a billion) devices for which this data needs to be written. I am perhaps writing a very few columns, but the number of rows is pretty large. Given the above two cases, is using MultivariateOnlineSummarizer not a good idea then? Anupam Bagchi > On Jul 13, 2015, at 7:06 PM, Feynman Liang <fli...@databricks.com> wrote: > > Dimensions mismatch when adding new sample. Expecting 8 but got 14. > > Make sure all the vectors you are summarizing over have the same dimension. > > Why would you want to write a MultivariateOnlineSummary object (which can be > represented with a couple Double's) into a distributed filesystem like HDFS? > > On Mon, Jul 13, 2015 at 6:54 PM, Anupam Bagchi <anupam_bag...@rocketmail.com > <mailto:anupam_bag...@rocketmail.com>> wrote: > Thank you Feynman for the lead. > > I was able to modify the code using clues from the RegressionMetrics example. > Here is what I got now. > > val deviceAggregateLogs = > sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() > > // Calculate statistics based on bytes-transferred > val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) > println(deviceIdsMap.collect().deep.mkString("\n")) > > val summary: MultivariateStatisticalSummary = { > val summary: MultivariateStatisticalSummary = deviceIdsMap.map { > case (deviceId, allaggregates) => Vectors.dense({ > val sortedAggregates = allaggregates.toArray > Sorting.quickSort(sortedAggregates) > sortedAggregates.map(dda => dda.bytes.toDouble) > }) > }.aggregate(new MultivariateOnlineSummarizer())( > (summary, v) => summary.add(v), // Not sure if this is really what I > want, it just came from the example > (sum1, sum2) => sum1.merge(sum2) // Same doubt here as well > ) > summary > } > It compiles fine. But I am now getting an exception as follows at Runtime. > > Exception in thread "main" org.apache.spark.SparkException: Job aborted due > to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: > Lost task 1.0 in stage 3.0 (TID 5, localhost): > java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch > when adding new sample. Expecting 8 but got 14. > at scala.Predef$.require(Predef.scala:233) > at > org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:70) > at > com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41) > at > com.aeris.analytics.machinelearning.statistics.DailyDeviceStatisticsAnalyzer$$anonfun$4.apply(DailyDeviceStatisticsAnalyzer.scala:41) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) > at > scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144) > at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201) > at scala.collection.AbstractIterator.aggregate(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966) > at org.apache.spark.rdd.RDD$$anonfun$26.apply(RDD.scala:966) > at > org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533) > at > org.apache.spark.SparkContext$$anonfun$32.apply(SparkContext.scala:1533) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:722) > > Can’t tell where exactly I went wrong. Also, how do I take the > MultivariateOnlineSummary object and write it to HDFS? I have the > MultivariateOnlineSummary object with me, but I really need an RDD to call > saveAsTextFile() on it. > > Anupam Bagchi > > >> On Jul 13, 2015, at 4:52 PM, Feynman Liang <fli...@databricks.com >> <mailto:fli...@databricks.com>> wrote: >> >> A good example is RegressionMetrics >> <https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/evaluation/RegressionMetrics.scala#L48>'s >> use of of OnlineMultivariateSummarizer to aggregate statistics across >> labels and residuals; take a look at how aggregateByKey is used there. >> >> On Mon, Jul 13, 2015 at 4:50 PM, Anupam Bagchi <anupam_bag...@rocketmail.com >> <mailto:anupam_bag...@rocketmail.com>> wrote: >> Thank you Feynman for your response. Since I am very new to Scala I may need >> a bit more hand-holding at this stage. >> >> I have been able to incorporate your suggestion about sorting - and it now >> works perfectly. Thanks again for that. >> >> I tried to use your suggestion of using MultiVariateOnlineSummarizer, but >> could not proceed further. For each deviceid (the key) my goal is to get a >> vector of doubles on which I can query the mean and standard deviation. Now >> because RDDs are immutable, I cannot use a foreach loop to interate through >> the groupby results and individually add the values in an RDD - Spark does >> not allow that. I need to apply the RDD functions directly on the entire set >> to achieve the transformations I need. This is where I am faltering since I >> am not used to the lambda expressions that Scala uses. >> >> object DeviceAnalyzer { >> def main(args: Array[String]) { >> val sparkConf = new SparkConf().setAppName("Device Analyzer") >> val sc = new SparkContext(sparkConf) >> >> val logFile = args(0) >> >> val deviceAggregateLogs = >> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() >> >> // Calculate statistics based on bytes >> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) >> // Question: Can we not write the line above as >> deviceAggregateLogs.groupBy(_.device_id).sortBy(c => c_.2, true) // Anything >> wrong? >> // All I need to do below is collect the vector of bytes for each device >> and store it in the RDD >> // The problem with the ‘foreach' approach below, is that it generates >> the vector values one at a time, which I cannot >> // add individually to an immutable RDD >> deviceIdsMap.foreach(a => { >> val device_id = a._1 // This is the device ID >> val allaggregates = a._2 // This is an array of all device-aggregates >> for this device >> >> val sortedaggregates = allaggregates.toArray >> Sorting.quickSort(sortedaggregates) >> >> val byteValues = sortedaggregates.map(dda => >> dda.bytes.toDouble).toArray >> val count = byteValues.count(A => true) >> val sum = byteValues.sum >> val xbar = sum / count >> val sum_x_minus_x_bar_square = byteValues.map(x => >> (x-xbar)*(x-xbar)).sum >> val stddev = math.sqrt(sum_x_minus_x_bar_square / count) >> >> val vector: Vector = Vectors.dense(byteValues) >> println(vector) >> println(device_id + "," + xbar + "," + stddev) >> >> }) >> //val vector: Vector = Vectors.dense(byteValues) >> //println(vector) >> //val summary: MultivariateStatisticalSummary = >> Statistics.colStats(vector) >> >> >> sc.stop() >> } >> } >> Can you show me how to write the ‘foreach’ loop in a Spark-friendly way? >> Thanks a lot for your help. >> >> Anupam Bagchi >> >> >>> On Jul 13, 2015, at 12:21 PM, Feynman Liang <fli...@databricks.com >>> <mailto:fli...@databricks.com>> wrote: >>> >>> The call to Sorting.quicksort is not working. Perhaps I am calling it the >>> wrong way. >>> allaggregates.toArray allocates and creates a new array separate from >>> allaggregates which is sorted by Sorting.quickSort; allaggregates. Try: >>> val sortedAggregates = allaggregates.toArray >>> Sorting.quickSort(sortedAggregates) >>> I would like to use the Spark mllib class MultivariateStatisticalSummary to >>> calculate the statistical values. >>> MultivariateStatisticalSummary is a trait (similar to a Java interface); >>> you probably want to use MultivariateOnlineSummarizer. >>> For that I would need to keep all my intermediate values as RDD so that I >>> can directly use the RDD methods to do the job. >>> Correct; you would do an aggregate using the add and merge functions >>> provided by MultivariateOnlineSummarizer >>> At the end I also need to write the results to HDFS for which there is a >>> method provided on the RDD class to do so, which is another reason I would >>> like to retain everything as RDD. >>> You can write the RDD[(device_id, MultivariateOnlineSummarizer)] to HDFS, >>> or you could unpack the relevant statistics from >>> MultivariateOnlineSummarizer into an array/tuple using a mapValues first >>> and then write. >>> >>> On Mon, Jul 13, 2015 at 10:07 AM, Anupam Bagchi >>> <anupam_bag...@rocketmail.com <mailto:anupam_bag...@rocketmail.com>> wrote: >>> I have to do the following tasks on a dataset using Apache Spark with Scala >>> as the programming language: >>> Read the dataset from HDFS. A few sample lines look like this: >>> deviceid,bytes,eventdate >>> 15590657,246620,20150630 >>> 14066921,1907,20150621 >>> 14066921,1906,20150626 >>> 6522013,2349,20150626 >>> 6522013,2525,20150613 >>> Group the data by device id. Thus we now have a map of deviceid => >>> (bytes,eventdate) >>> For each device, sort the set by eventdate. We now have an ordered set of >>> bytes based on eventdate for each device. >>> Pick the last 30 days of bytes from this ordered set. >>> Find the moving average of bytes for the last date using a time period of >>> 30. >>> Find the standard deviation of the bytes for the final date using a time >>> period of 30. >>> Return two values in the result (mean - kstddev) and (mean + kstddev) >>> [Assume k = 3] >>> I am using Apache Spark 1.3.0. The actual dataset is wider, and it has to >>> run on a billion rows finally. >>> Here is the data structure for the dataset. >>> package com.testing >>> case class DeviceAggregates ( >>> device_id: Integer, >>> bytes: Long, >>> eventdate: Integer >>> ) extends Ordered[DailyDeviceAggregates] { >>> def compare(that: DailyDeviceAggregates): Int = { >>> eventdate - that.eventdate >>> } >>> } >>> object DeviceAggregates { >>> def parseLogLine(logline: String): DailyDeviceAggregates = { >>> val c = logline.split(",") >>> DailyDeviceAggregates(c(0).toInt, c(1).toLong, c(2).toInt) >>> } >>> } >>> The DeviceAnalyzer class looks like this: >>> I have a very crude implementation that does the job, but it is not up to >>> the mark. Sorry, I am very new to Scala/Spark, so my questions are quite >>> basic. Here is what I have now: >>> >>> import com.testing.DailyDeviceAggregates >>> import org.apache.spark.{SparkContext, SparkConf} >>> import org.apache.spark.mllib.linalg.Vector >>> import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, >>> Statistics} >>> import org.apache.spark.mllib.linalg.{Vector, Vectors} >>> >>> import scala.util.Sorting >>> >>> object DeviceAnalyzer { >>> def main(args: Array[String]) { >>> val sparkConf = new SparkConf().setAppName("Device Analyzer") >>> val sc = new SparkContext(sparkConf) >>> >>> val logFile = args(0) >>> >>> val deviceAggregateLogs = >>> sc.textFile(logFile).map(DailyDeviceAggregates.parseLogLine).cache() >>> >>> // Calculate statistics based on bytes >>> val deviceIdsMap = deviceAggregateLogs.groupBy(_.device_id) >>> >>> deviceIdsMap.foreach(a => { >>> val device_id = a._1 // This is the device ID >>> val allaggregates = a._2 // This is an array of all >>> device-aggregates for this device >>> >>> println(allaggregates) >>> Sorting.quickSort(allaggregates.toArray) // Sort the CompactBuffer of >>> DailyDeviceAggregates based on eventdate >>> println(allaggregates) // This does not work - results are not sorted >>> !! >>> >>> val byteValues = allaggregates.map(dda => dda.bytes.toDouble).toArray >>> val count = byteValues.count(A => true) >>> val sum = byteValues.sum >>> val xbar = sum / count >>> val sum_x_minus_x_bar_square = byteValues.map(x => >>> (x-xbar)*(x-xbar)).sum >>> val stddev = math.sqrt(sum_x_minus_x_bar_square / count) >>> >>> val vector: Vector = Vectors.dense(byteValues) >>> println(vector) >>> println(device_id + "," + xbar + "," + stddev) >>> >>> //val vector: Vector = Vectors.dense(byteValues) >>> //println(vector) >>> //val summary: MultivariateStatisticalSummary = >>> Statistics.colStats(vector) >>> }) >>> >>> sc.stop() >>> } >>> } >>> I would really appreciate if someone can suggests improvements for the >>> following: >>> The call to Sorting.quicksort is not working. Perhaps I am calling it the >>> wrong way. >>> I would like to use the Spark mllib class MultivariateStatisticalSummary to >>> calculate the statistical values. >>> For that I would need to keep all my intermediate values as RDD so that I >>> can directly use the RDD methods to do the job. >>> At the end I also need to write the results to HDFS for which there is a >>> method provided on the RDD class to do so, which is another reason I would >>> like to retain everything as RDD. >>> >>> Thanks in advance for your help. >>> >>> Anupam Bagchi >>> >>> >> >> > >