FYI the same happens with accumulators when recovering from checkpoint. I'd 
love to see this fixed somehow as the workaround (using a singleton factory in 
foreachRdd to make sure the accumulators are initialized instead of null) is 
really intrusive...

Sent from my iPhone

On 05 Oct 2015, at 22:52, Tathagata Das 
<t...@databricks.com<mailto:t...@databricks.com>> wrote:

Make sure the broadcast variable works independent of the streaming 
application. Then make sure it work without have 
StreamingContext.getOrCreate(). That will disambiguate whether that error is 
thrown when starting a new context, or when recovering a context from 
checkpoint (as getOrCreate is supposed to do).

On Mon, Oct 5, 2015 at 9:23 AM, dpristin 
<dpris...@gmail.com<mailto:dpris...@gmail.com>> wrote:
Hi,

Can anyone point me out to what I'm doing wrong? I've implemented a very
basic spark streaming app that uses a single broadcast variable. When it
runs locally it produces a proper output (the array I broadcast). But when
deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
is the code:

--- imports go here

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info<http://logger.info>("OinkSparkMain - Setup Logger")

// This is our custom context setup code; nothing fancy goes on here
  val config = Configuration(args)
  val ssc: StreamingContext =
StreamingContext.getOrCreate(config.checkpointDirectory, () => {
SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})


  val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
  val messages = kafkaStreamFactory.Create

  // Grab the value data above kafka input dstream as a string
  val events = messages.map( s => s._2 )

  //Create a broadcast variable - straight from the dev guide
  val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))

  //Try to print out the value of the broadcast var here
  val transformed = events.transform(rdd => {
    rdd.map(x => {
      if(broadcastVar == null) {
        println("broadcastVar is null")
      }  else {
        println("broadcastVar value: " + broadcastVar.value.mkString("|"))
      }
      x
    })
  })

  transformed.foreachRDD(x => logger.info<http://logger.info>("Data: " +
x.collect.mkString("|")))

  ssc.start()
  ssc.awaitTermination()
}

Any input is very much appreciated!

Regards,
Dmitry.







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
Sent from the Apache Spark User List mailing list archive at 
Nabble.com<http://nabble.com>.

---------------------------------------------------------------------
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
For additional commands, e-mail: 
user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>


Reply via email to