Hi James,

The code below shows one way how you can update the broadcast variable on
the executors:

    // ... events stream setup

    var startTime = new Date().getTime()

    var hashMap = HashMap("1" -> ("1", 1), "2" -> ("2", 2))

    var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap)

    val TWO_MINUTES = 120000

    //eventStream is a DStream

    eventStream.foreachRDD(rdd => {

      // Executed on the driver not the executors

      if (new Date().getTime() - startTime  > TWO_MINUTES) {

        // remove old broadcast variable

        hashMapBroadcast.unpersist()

        // create new one

        hashMapBroadcast = stream.context.sparkContext.broadcast("1" -> ("1",
1000), "2" -> ("2", 2000))

      }

    })

    val broadcastValuesFromStream = activitiesByVisitKey.map(activity =>
hashMapBroadcast.value("1"))

    // should print (1, 1000) after 2 minutes when updated

    broadcastValuesFromStream.print()


Regards,

Conor




On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey <
raghavendra.pan...@gmail.com> wrote:

> You cannot update the broadcasted variable.. It wont get reflected on
> workers.
> On Jul 3, 2015 12:18 PM, "James Cole" <ja...@binarism.net> wrote:
>
>> Hi all,
>>
>> I'm filtering a DStream using a function. I need to be able to change
>> this function while the application is running (I'm polling a service to
>> see if a user has changed their filtering). The filter function is a
>> transformation and runs on the workers, so that's where the updates need to
>> go. I'm not sure of the best way to do this.
>>
>> Initially broadcasting seemed like the way to go: the filter is actually
>> quite large. But I don't think I can update something I've broadcasted.
>> I've tried unpersisting and re-creating the broadcast variable but it
>> became obvious this wasn't updating the reference on the worker. So am I
>> correct in thinking I can't use broadcasted variables for this purpose?
>>
>> The next option seems to be: stopping the JavaStreamingContext, creating
>> a new one from the SparkContext, updating the filter function, and
>> re-creating the DStreams (I'm using direct streams from Kafka).
>>
>> If I re-created the JavaStreamingContext would the accumulators (which
>> are created from the SparkContext) keep working? (Obviously I'm going to
>> try this soon)
>>
>> In summary:
>>
>> 1) Can broadcasted variables be updated?
>>
>> 2) Is there a better way than re-creating the JavaStreamingContext and
>> DStreams?
>>
>> Thanks,
>>
>> James
>>
>>

Reply via email to