Hi James, The code below shows one way how you can update the broadcast variable on the executors:
// ... events stream setup var startTime = new Date().getTime() var hashMap = HashMap("1" -> ("1", 1), "2" -> ("2", 2)) var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap) val TWO_MINUTES = 120000 //eventStream is a DStream eventStream.foreachRDD(rdd => { // Executed on the driver not the executors if (new Date().getTime() - startTime > TWO_MINUTES) { // remove old broadcast variable hashMapBroadcast.unpersist() // create new one hashMapBroadcast = stream.context.sparkContext.broadcast("1" -> ("1", 1000), "2" -> ("2", 2000)) } }) val broadcastValuesFromStream = activitiesByVisitKey.map(activity => hashMapBroadcast.value("1")) // should print (1, 1000) after 2 minutes when updated broadcastValuesFromStream.print() Regards, Conor On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey < raghavendra.pan...@gmail.com> wrote: > You cannot update the broadcasted variable.. It wont get reflected on > workers. > On Jul 3, 2015 12:18 PM, "James Cole" <ja...@binarism.net> wrote: > >> Hi all, >> >> I'm filtering a DStream using a function. I need to be able to change >> this function while the application is running (I'm polling a service to >> see if a user has changed their filtering). The filter function is a >> transformation and runs on the workers, so that's where the updates need to >> go. I'm not sure of the best way to do this. >> >> Initially broadcasting seemed like the way to go: the filter is actually >> quite large. But I don't think I can update something I've broadcasted. >> I've tried unpersisting and re-creating the broadcast variable but it >> became obvious this wasn't updating the reference on the worker. So am I >> correct in thinking I can't use broadcasted variables for this purpose? >> >> The next option seems to be: stopping the JavaStreamingContext, creating >> a new one from the SparkContext, updating the filter function, and >> re-creating the DStreams (I'm using direct streams from Kafka). >> >> If I re-created the JavaStreamingContext would the accumulators (which >> are created from the SparkContext) keep working? (Obviously I'm going to >> try this soon) >> >> In summary: >> >> 1) Can broadcasted variables be updated? >> >> 2) Is there a better way than re-creating the JavaStreamingContext and >> DStreams? >> >> Thanks, >> >> James >> >>