1. You don't have to create another sparkContext. you can simply call the
*ssc.sparkContext*

2. May be after the transformation on geoData, you could do a persist so
next time, it will be read from memory.

Thanks
Best Regards

On Thu, Nov 20, 2014 at 6:43 AM, YaoPau <[email protected]> wrote:

> Here is my attempt:
>
> val sparkConf = new SparkConf().setAppName("LogCounter")
> val ssc =  new StreamingContext(sparkConf, Seconds(2))
>
> val sc = new SparkContext()
> val geoData = sc.textFile("data/geoRegion.csv")
>             .map(_.split(','))
>             .map(line => (line(0), (line(1),line(2),line(3),line(4))))
>
> val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
> val lines = KafkaUtils.createStream(ssc, zkQuorum, group,
> topicMap).map(_._2)
>
> val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details
> removed for brevity
> val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData))
>
> This is very wrong.  I have a feeling I should be broadcasting geoData
> instead of reading it in with each task (it's a 100MB file), but I'm not
> sure where to put the code that maps from the .csv to the final geoData
> rdd.
>
> Also I'm not sure if geoData is even defined correctly (maybe it should use
> ssc instead of sc?).  Please advise.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to