FYI the same happens with accumulators when recovering from checkpoint. I'd love to see this fixed somehow as the workaround (using a singleton factory in foreachRdd to make sure the accumulators are initialized instead of null) is really intrusive...
Sent from my iPhone On 05 Oct 2015, at 22:52, Tathagata Das <t...@databricks.com<mailto:t...@databricks.com>> wrote: Make sure the broadcast variable works independent of the streaming application. Then make sure it work without have StreamingContext.getOrCreate(). That will disambiguate whether that error is thrown when starting a new context, or when recovering a context from checkpoint (as getOrCreate is supposed to do). On Mon, Oct 5, 2015 at 9:23 AM, dpristin <dpris...@gmail.com<mailto:dpris...@gmail.com>> wrote: Hi, Can anyone point me out to what I'm doing wrong? I've implemented a very basic spark streaming app that uses a single broadcast variable. When it runs locally it produces a proper output (the array I broadcast). But when deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here is the code: --- imports go here object BroadcastTest extends App { val logger = LoggerFactory.getLogger("OinkSparkMain") logger.info<http://logger.info>("OinkSparkMain - Setup Logger") // This is our custom context setup code; nothing fancy goes on here val config = Configuration(args) val ssc: StreamingContext = StreamingContext.getOrCreate(config.checkpointDirectory, () => { SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))}) val kafkaStreamFactory = new KafkaStreamFactory(config, ssc) val messages = kafkaStreamFactory.Create // Grab the value data above kafka input dstream as a string val events = messages.map( s => s._2 ) //Create a broadcast variable - straight from the dev guide val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3)) //Try to print out the value of the broadcast var here val transformed = events.transform(rdd => { rdd.map(x => { if(broadcastVar == null) { println("broadcastVar is null") } else { println("broadcastVar value: " + broadcastVar.value.mkString("|")) } x }) }) transformed.foreachRDD(x => logger.info<http://logger.info>("Data: " + x.collect.mkString("|"))) ssc.start() ssc.awaitTermination() } Any input is very much appreciated! Regards, Dmitry. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html Sent from the Apache Spark User List mailing list archive at Nabble.com<http://nabble.com>. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>