Re: Streaming: updating broadcast variables

2015-07-06 Thread Conor Fennell
Hi James,

The code below shows one way how you can update the broadcast variable on
the executors:

// ... events stream setup

var startTime = new Date().getTime()

var hashMap = HashMap(1 - (1, 1), 2 - (2, 2))

var hashMapBroadcast = stream.context.sparkContext.broadcast(hashMap)

val TWO_MINUTES = 12

//eventStream is a DStream

eventStream.foreachRDD(rdd = {

  // Executed on the driver not the executors

  if (new Date().getTime() - startTime   TWO_MINUTES) {

// remove old broadcast variable

hashMapBroadcast.unpersist()

// create new one

hashMapBroadcast = stream.context.sparkContext.broadcast(1 - (1,
1000), 2 - (2, 2000))

  }

})

val broadcastValuesFromStream = activitiesByVisitKey.map(activity =
hashMapBroadcast.value(1))

// should print (1, 1000) after 2 minutes when updated

broadcastValuesFromStream.print()


Regards,

Conor




On Fri, Jul 3, 2015 at 4:24 PM, Raghavendra Pandey 
raghavendra.pan...@gmail.com wrote:

 You cannot update the broadcasted variable.. It wont get reflected on
 workers.
 On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote:

 Hi all,

 I'm filtering a DStream using a function. I need to be able to change
 this function while the application is running (I'm polling a service to
 see if a user has changed their filtering). The filter function is a
 transformation and runs on the workers, so that's where the updates need to
 go. I'm not sure of the best way to do this.

 Initially broadcasting seemed like the way to go: the filter is actually
 quite large. But I don't think I can update something I've broadcasted.
 I've tried unpersisting and re-creating the broadcast variable but it
 became obvious this wasn't updating the reference on the worker. So am I
 correct in thinking I can't use broadcasted variables for this purpose?

 The next option seems to be: stopping the JavaStreamingContext, creating
 a new one from the SparkContext, updating the filter function, and
 re-creating the DStreams (I'm using direct streams from Kafka).

 If I re-created the JavaStreamingContext would the accumulators (which
 are created from the SparkContext) keep working? (Obviously I'm going to
 try this soon)

 In summary:

 1) Can broadcasted variables be updated?

 2) Is there a better way than re-creating the JavaStreamingContext and
 DStreams?

 Thanks,

 James




Re: Streaming: updating broadcast variables

2015-07-03 Thread Raghavendra Pandey
You cannot update the broadcasted variable.. It wont get reflected on
workers.
On Jul 3, 2015 12:18 PM, James Cole ja...@binarism.net wrote:

 Hi all,

 I'm filtering a DStream using a function. I need to be able to change this
 function while the application is running (I'm polling a service to see if
 a user has changed their filtering). The filter function is a
 transformation and runs on the workers, so that's where the updates need to
 go. I'm not sure of the best way to do this.

 Initially broadcasting seemed like the way to go: the filter is actually
 quite large. But I don't think I can update something I've broadcasted.
 I've tried unpersisting and re-creating the broadcast variable but it
 became obvious this wasn't updating the reference on the worker. So am I
 correct in thinking I can't use broadcasted variables for this purpose?

 The next option seems to be: stopping the JavaStreamingContext, creating a
 new one from the SparkContext, updating the filter function, and
 re-creating the DStreams (I'm using direct streams from Kafka).

 If I re-created the JavaStreamingContext would the accumulators (which are
 created from the SparkContext) keep working? (Obviously I'm going to try
 this soon)

 In summary:

 1) Can broadcasted variables be updated?

 2) Is there a better way than re-creating the JavaStreamingContext and
 DStreams?

 Thanks,

 James




Streaming: updating broadcast variables

2015-07-03 Thread James Cole
Hi all,

I'm filtering a DStream using a function. I need to be able to change this
function while the application is running (I'm polling a service to see if
a user has changed their filtering). The filter function is a
transformation and runs on the workers, so that's where the updates need to
go. I'm not sure of the best way to do this.

Initially broadcasting seemed like the way to go: the filter is actually
quite large. But I don't think I can update something I've broadcasted.
I've tried unpersisting and re-creating the broadcast variable but it
became obvious this wasn't updating the reference on the worker. So am I
correct in thinking I can't use broadcasted variables for this purpose?

The next option seems to be: stopping the JavaStreamingContext, creating a
new one from the SparkContext, updating the filter function, and
re-creating the DStreams (I'm using direct streams from Kafka).

If I re-created the JavaStreamingContext would the accumulators (which are
created from the SparkContext) keep working? (Obviously I'm going to try
this soon)

In summary:

1) Can broadcasted variables be updated?

2) Is there a better way than re-creating the JavaStreamingContext and
DStreams?

Thanks,

James