Re: How to return a pair RDD from an RDD that has foreachPartition applied?
I think you can use mapPartitions that returns PairRDDs followed by forEachPartition for saving it On Wed, Nov 18, 2015 at 9:31 AM swetha kasireddy wrote: > Looks like I can use mapPartitions but can it be done using > forEachPartition? > > On Tue, Nov 17, 2015 at 11:51 PM, swetha > wrote: > >> Hi, >> >> How to return an RDD of key/value pairs from an RDD that has >> foreachPartition applied. I have my code something like the following. It >> looks like an RDD that has foreachPartition can have only the return type >> as >> Unit. How do I apply foreachPartition and do a save and at the same >> return a >> pair RDD. >> >> def saveDataPointsBatchNew(records: RDD[(String, (Long, >> java.util.LinkedHashMap[java.lang.Long, >> java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float], >> java.util.HashSet[java.lang.String] , Boolean))])= { >> records.foreachPartition({ partitionOfRecords => >> val dataLoader = new DataLoaderImpl(); >> var metricList = new java.util.ArrayList[String](); >> var storageTimeStamp = 0l >> >> if (partitionOfRecords != null) { >> partitionOfRecords.foreach(record => { >> >> if (record._2._1 == 0l) { >> entrySet = record._2._3.entrySet() >> itr = entrySet.iterator(); >> while (itr.hasNext()) { >> val entry = itr.next(); >> storageTimeStamp = entry.getKey.toLong >> val dayCounts = entry.getValue >> metricsDayCounts += record._1 ->(storageTimeStamp, >> dayCounts.toFloat) >> } >> } >>} >> } >> ) >> } >> >> //Code to insert the last successful batch/streaming timestamp ends >> dataLoader.saveDataPoints(metricList); >> metricList = null >> >> }) >> } >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Re: How to return a pair RDD from an RDD that has foreachPartition applied?
Looks like I can use mapPartitions but can it be done using forEachPartition? On Tue, Nov 17, 2015 at 11:51 PM, swetha wrote: > Hi, > > How to return an RDD of key/value pairs from an RDD that has > foreachPartition applied. I have my code something like the following. It > looks like an RDD that has foreachPartition can have only the return type > as > Unit. How do I apply foreachPartition and do a save and at the same return > a > pair RDD. > > def saveDataPointsBatchNew(records: RDD[(String, (Long, > java.util.LinkedHashMap[java.lang.Long, > java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float], > java.util.HashSet[java.lang.String] , Boolean))])= { > records.foreachPartition({ partitionOfRecords => > val dataLoader = new DataLoaderImpl(); > var metricList = new java.util.ArrayList[String](); > var storageTimeStamp = 0l > > if (partitionOfRecords != null) { > partitionOfRecords.foreach(record => { > > if (record._2._1 == 0l) { > entrySet = record._2._3.entrySet() > itr = entrySet.iterator(); > while (itr.hasNext()) { > val entry = itr.next(); > storageTimeStamp = entry.getKey.toLong > val dayCounts = entry.getValue > metricsDayCounts += record._1 ->(storageTimeStamp, > dayCounts.toFloat) > } > } >} > } > ) > } > > //Code to insert the last successful batch/streaming timestamp ends > dataLoader.saveDataPoints(metricList); > metricList = null > > }) > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
How to return a pair RDD from an RDD that has foreachPartition applied?
Hi, How to return an RDD of key/value pairs from an RDD that has foreachPartition applied. I have my code something like the following. It looks like an RDD that has foreachPartition can have only the return type as Unit. How do I apply foreachPartition and do a save and at the same return a pair RDD. def saveDataPointsBatchNew(records: RDD[(String, (Long, java.util.LinkedHashMap[java.lang.Long, java.lang.Float],java.util.LinkedHashMap[java.lang.Long, java.lang.Float], java.util.HashSet[java.lang.String] , Boolean))])= { records.foreachPartition({ partitionOfRecords => val dataLoader = new DataLoaderImpl(); var metricList = new java.util.ArrayList[String](); var storageTimeStamp = 0l if (partitionOfRecords != null) { partitionOfRecords.foreach(record => { if (record._2._1 == 0l) { entrySet = record._2._3.entrySet() itr = entrySet.iterator(); while (itr.hasNext()) { val entry = itr.next(); storageTimeStamp = entry.getKey.toLong val dayCounts = entry.getValue metricsDayCounts += record._1 ->(storageTimeStamp, dayCounts.toFloat) } } } } ) } //Code to insert the last successful batch/streaming timestamp ends dataLoader.saveDataPoints(metricList); metricList = null }) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-return-a-pair-RDD-from-an-RDD-that-has-foreachPartition-applied-tp25411.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org