Hi guys, thanks a lot for responding so quickly! I've reduced the code to the code below - no streaming, no Kafka, no checkpoint. Unfortunately the end result is the same. Any suggestion to where I'm messing up would be very much appreciated !
object BroadcastTest extends App { val logger = LoggerFactory.getLogger("OinkSparkMain") logger.info("OinkSparkMain - Setup Logger") val sparkConf = new SparkConf().setAppName("OinkSparkMain") val sc : SparkContext = new SparkContext(sparkConf) val rdd = sc.parallelize(Array(1,2,3)); val arr = Array(1, 2, 3) val broadcastVar = sc.broadcast(arr) val mappedEvents = rdd.map(e => { val l = LoggerFactory.getLogger("OinkSparkMain1") if (broadcastVar == null) { l.info("broadcastVar is null") (e, "empty") } else { val str = broadcastVar.value.mkString(" | ") l.info("broadcastVar is " + str) (e, str) } }) logger.info("****** Total reduced count: " + mappedEvents.collect().length) } On Mon, Oct 5, 2015 at 4:14 PM, Adrian Tanase <atan...@adobe.com> wrote: > FYI the same happens with accumulators when recovering from checkpoint. > I'd love to see this fixed somehow as the workaround (using a singleton > factory in foreachRdd to make sure the accumulators are initialized instead > of null) is really intrusive... > > Sent from my iPhone > > On 05 Oct 2015, at 22:52, Tathagata Das <t...@databricks.com> wrote: > > Make sure the broadcast variable works independent of the streaming > application. Then make sure it work without have > StreamingContext.getOrCreate(). That will disambiguate whether that error > is thrown when starting a new context, or when recovering a context from > checkpoint (as getOrCreate is supposed to do). > > On Mon, Oct 5, 2015 at 9:23 AM, dpristin <dpris...@gmail.com> wrote: > >> Hi, >> >> Can anyone point me out to what I'm doing wrong? I've implemented a very >> basic spark streaming app that uses a single broadcast variable. When it >> runs locally it produces a proper output (the array I broadcast). But when >> deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here >> is the code: >> >> --- imports go here >> >> object BroadcastTest extends App { >> val logger = LoggerFactory.getLogger("OinkSparkMain") >> logger.info("OinkSparkMain - Setup Logger") >> >> // This is our custom context setup code; nothing fancy goes on here >> val config = Configuration(args) >> val ssc: StreamingContext = >> StreamingContext.getOrCreate(config.checkpointDirectory, () => { >> SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))}) >> >> >> val kafkaStreamFactory = new KafkaStreamFactory(config, ssc) >> val messages = kafkaStreamFactory.Create >> >> // Grab the value data above kafka input dstream as a string >> val events = messages.map( s => s._2 ) >> >> //Create a broadcast variable - straight from the dev guide >> val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3)) >> >> //Try to print out the value of the broadcast var here >> val transformed = events.transform(rdd => { >> rdd.map(x => { >> if(broadcastVar == null) { >> println("broadcastVar is null") >> } else { >> println("broadcastVar value: " + broadcastVar.value.mkString("|")) >> } >> x >> }) >> }) >> >> transformed.foreachRDD(x => logger.info("Data: " + >> x.collect.mkString("|"))) >> >> ssc.start() >> ssc.awaitTermination() >> } >> >> Any input is very much appreciated! >> >> Regards, >> Dmitry. >> >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com >> <http://nabble.com>. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >