Github user caikehe commented on the pull request:
https://github.com/apache/spark/pull/5629#issuecomment-95145475
package orange.fr.scalaTest
import com.twitter.algebird._
import com.twitter.algebird.CMSHasherImplicits._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
// scalastyle:off
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird
library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter
stream.
* <br>
* <strong>Note</strong> that since Algebird's implementation currently
only supports Long inputs,
* the example operates on Long IDs. Once the implementation supports
other inputs (such as String),
* the same approach could be used for computing popular topics for
example.
* <p>
* <p>
* <a href=
*
"http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
* This blog post</a> has a good overview of the Count-Min Sketch (CMS).
The CMS is a data
* structure for approximate frequency estimation in data streams (e.g.
Top-K elements, frequency
* of any given element, etc), that uses space sub-linear in the number
of elements in the
* stream. Once elements are added to the CMS, the estimated count of an
element can be computed,
* as well as "heavy-hitters" that occur more than a threshold percentage
of the overall total
* count.
* <p><p>
* Algebird's implementation is a monoid, so we can succinctly merge two
CMS instances in the
* reduce operation.
*/
// scalastyle:on
object TwitterAlgebirdCMS {
def main(args: Array[String]) {
StreamingExamplesLogging.setStreamingLogLevels()
val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret)
= args.take(4)
// Set the system properties so that Twitter4j library used by twitter
stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret",
accessTokenSecret)
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01
val SEED = 1
val PERC = 0.001
// K highest frequency elements to take
val TOPK = 10
val filters = args.takeRight(args.length - 4)
val sparkConf = new
SparkConf().setAppName("TwitterAlgebirdCMS").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val stream = TwitterUtils.createStream(ssc, None, filters,
StorageLevel.MEMORY_ONLY_SER_2)
val users = stream.map(status => status.getUser.getId)
//val cms = new CountMinSketchMonoid(EPS, DELTA, SEED, PERC)
val cms = TopPctCMS.monoid[Long](EPS, DELTA, SEED, PERC)
var globalCMS = cms.zero
val mm = new MapMonoid[Long, Int]()
var globalExact = Map[Long, Int]()
val approxTopUsers = users.mapPartitions(ids => {
ids.map(id => cms.create(id))
}).reduce(_ ++ _)
val exactTopUsers = users.map(id => (id, 1))
.reduceByKey((a, b) => a + b)
approxTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partial = rdd.first()
val partialTopK = partial.heavyHitters.map(id =>
(id,
partial.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
globalCMS ++= partial
val globalTopK = globalCMS.heavyHitters.map(id =>
(id,
globalCMS.frequency(id).estimate)).toSeq.sortBy(_._2).reverse.slice(0, TOPK)
println("Approx heavy hitters at %2.2f%% threshold this batch:
%s".format(PERC,
partialTopK.mkString("[", ",", "]")))
println("Approx heavy hitters at %2.2f%% threshold overall:
%s".format(PERC,
globalTopK.mkString("[", ",", "]")))
}
})
exactTopUsers.foreachRDD(rdd => {
if (rdd.count() != 0) {
val partialMap = rdd.collect().toMap
val partialTopK = rdd.map(
{case (id, count) => (count, id)})
.sortByKey(ascending = false).take(TOPK)
globalExact = mm.plus(globalExact.toMap, partialMap)
val globalTopK = globalExact.toSeq.sortBy(_._2).reverse.slice(0,
TOPK)
println("Exact heavy hitters this batch:
%s".format(partialTopK.mkString("[", ",", "]")))
println("Exact heavy hitters overall:
%s".format(globalTopK.mkString("[", ",", "]")))
}
})
ssc.start()
ssc.awaitTermination()
}
}
I test this code on my local machine it works, while you test it you should
set the twitter anthentication part.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]