1. You don't have to create another sparkContext. you can simply call the *ssc.sparkContext*
2. May be after the transformation on geoData, you could do a persist so next time, it will be read from memory. Thanks Best Regards On Thu, Nov 20, 2014 at 6:43 AM, YaoPau <[email protected]> wrote: > Here is my attempt: > > val sparkConf = new SparkConf().setAppName("LogCounter") > val ssc = new StreamingContext(sparkConf, Seconds(2)) > > val sc = new SparkContext() > val geoData = sc.textFile("data/geoRegion.csv") > .map(_.split(',')) > .map(line => (line(0), (line(1),line(2),line(3),line(4)))) > > val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap > val lines = KafkaUtils.createStream(ssc, zkQuorum, group, > topicMap).map(_._2) > > val goodIPsFltrBI = lines.filter(...).map(...).filter(...) // details > removed for brevity > val vdpJoinedGeo = goodIPsFltrBI.transform(rdd =>rdd.join(geoData)) > > This is very wrong. I have a feeling I should be broadcasting geoData > instead of reading it in with each task (it's a 100MB file), but I'm not > sure where to put the code that maps from the .csv to the final geoData > rdd. > > Also I'm not sure if geoData is even defined correctly (maybe it should use > ssc instead of sc?). Please advise. > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Joining-DStream-with-static-file-tp19329.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
