Re: Streaming: updating broadcast variables
Hi James, The code below shows one way how you can update the broadcast variable on the executors: // ... events stream setup var startTime = new Date().getTime() var hashMap = HashMap(1 - (1, 1), 2 - (2, 2)) var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap) val TWO_MINUTES = 12 //eventStream is a DStream eventStream.foreachRDD(rdd = { // Executed on the driver not the executors if (new Date().getTime() - startTime TWO_MINUTES) { // remove old broadcast variable hashMapBroadcast.unpersist() // create new one hashMapBroadcast = stream.context.sparkContext.broadcast(1 - (1, 1000), 2 - (2, 2000)) } }) val broadcastValuesFromStream = activitiesByVisitKey.map(activity = hashMapBroadcast.value(1)) // should print (1, 1000) after 2 minutes when updated broadcastValuesFromStream.print() Regards, Conor On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey raghavendra.pan...@gmail.com wrote: You cannot update the broadcasted variable.. It wont get reflected on workers. On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote: Hi all, I'm filtering a DStream using a function. I need to be able to change this function while the application is running (I'm polling a service to see if a user has changed their filtering). The filter function is a transformation and runs on the workers, so that's where the updates need to go. I'm not sure of the best way to do this. Initially broadcasting seemed like the way to go: the filter is actually quite large. But I don't think I can update something I've broadcasted. I've tried unpersisting and re-creating the broadcast variable but it became obvious this wasn't updating the reference on the worker. So am I correct in thinking I can't use broadcasted variables for this purpose? The next option seems to be: stopping the JavaStreamingContext, creating a new one from the SparkContext, updating the filter function, and re-creating the DStreams (I'm using direct streams from Kafka). If I re-created the JavaStreamingContext would the accumulators (which are created from the SparkContext) keep working? (Obviously I'm going to try this soon) In summary: 1) Can broadcasted variables be updated? 2) Is there a better way than re-creating the JavaStreamingContext and DStreams? Thanks, James
Re: Streaming: updating broadcast variables
You cannot update the broadcasted variable.. It wont get reflected on workers. On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote: Hi all, I'm filtering a DStream using a function. I need to be able to change this function while the application is running (I'm polling a service to see if a user has changed their filtering). The filter function is a transformation and runs on the workers, so that's where the updates need to go. I'm not sure of the best way to do this. Initially broadcasting seemed like the way to go: the filter is actually quite large. But I don't think I can update something I've broadcasted. I've tried unpersisting and re-creating the broadcast variable but it became obvious this wasn't updating the reference on the worker. So am I correct in thinking I can't use broadcasted variables for this purpose? The next option seems to be: stopping the JavaStreamingContext, creating a new one from the SparkContext, updating the filter function, and re-creating the DStreams (I'm using direct streams from Kafka). If I re-created the JavaStreamingContext would the accumulators (which are created from the SparkContext) keep working? (Obviously I'm going to try this soon) In summary: 1) Can broadcasted variables be updated? 2) Is there a better way than re-creating the JavaStreamingContext and DStreams? Thanks, James
Streaming: updating broadcast variables
Hi all, I'm filtering a DStream using a function. I need to be able to change this function while the application is running (I'm polling a service to see if a user has changed their filtering). The filter function is a transformation and runs on the workers, so that's where the updates need to go. I'm not sure of the best way to do this. Initially broadcasting seemed like the way to go: the filter is actually quite large. But I don't think I can update something I've broadcasted. I've tried unpersisting and re-creating the broadcast variable but it became obvious this wasn't updating the reference on the worker. So am I correct in thinking I can't use broadcasted variables for this purpose? The next option seems to be: stopping the JavaStreamingContext, creating a new one from the SparkContext, updating the filter function, and re-creating the DStreams (I'm using direct streams from Kafka). If I re-created the JavaStreamingContext would the accumulators (which are created from the SparkContext) keep working? (Obviously I'm going to try this soon) In summary: 1) Can broadcasted variables be updated? 2) Is there a better way than re-creating the JavaStreamingContext and DStreams? Thanks, James