Hi guys,
thanks a lot for responding so quickly!

I've reduced the code to the code below - no streaming, no Kafka, no
checkpoint. Unfortunately the end result is the same. Any suggestion to
where I'm messing up would be very much appreciated !

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info("OinkSparkMain - Setup Logger")

  val sparkConf = new SparkConf().setAppName("OinkSparkMain")
  val sc : SparkContext = new SparkContext(sparkConf)

  val rdd = sc.parallelize(Array(1,2,3));

  val arr = Array(1, 2, 3)
  val broadcastVar = sc.broadcast(arr)

  val mappedEvents =  rdd.map(e => {
    val l = LoggerFactory.getLogger("OinkSparkMain1")

    if (broadcastVar == null) {
      l.info("broadcastVar is null")
      (e, "empty")
    }
    else {
      val str = broadcastVar.value.mkString(" | ")
      l.info("broadcastVar is " + str)
      (e, str)
    }
  })

  logger.info("****** Total reduced count: " + mappedEvents.collect().length)
}


On Mon, Oct 5, 2015 at 4:14 PM, Adrian Tanase <atan...@adobe.com> wrote:

> FYI the same happens with accumulators when recovering from checkpoint.
> I'd love to see this fixed somehow as the workaround (using a singleton
> factory in foreachRdd to make sure the accumulators are initialized instead
> of null) is really intrusive...
>
> Sent from my iPhone
>
> On 05 Oct 2015, at 22:52, Tathagata Das <t...@databricks.com> wrote:
>
> Make sure the broadcast variable works independent of the streaming
> application. Then make sure it work without have
> StreamingContext.getOrCreate(). That will disambiguate whether that error
> is thrown when starting a new context, or when recovering a context from
> checkpoint (as getOrCreate is supposed to do).
>
> On Mon, Oct 5, 2015 at 9:23 AM, dpristin <dpris...@gmail.com> wrote:
>
>> Hi,
>>
>> Can anyone point me out to what I'm doing wrong? I've implemented a very
>> basic spark streaming app that uses a single broadcast variable. When it
>> runs locally it produces a proper output (the array I broadcast). But when
>> deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
>> is the code:
>>
>> --- imports go here
>>
>> object BroadcastTest extends App {
>>   val logger = LoggerFactory.getLogger("OinkSparkMain")
>>   logger.info("OinkSparkMain - Setup Logger")
>>
>> // This is our custom context setup code; nothing fancy goes on here
>>   val config = Configuration(args)
>>   val ssc: StreamingContext =
>> StreamingContext.getOrCreate(config.checkpointDirectory, () => {
>> SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})
>>
>>
>>   val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
>>   val messages = kafkaStreamFactory.Create
>>
>>   // Grab the value data above kafka input dstream as a string
>>   val events = messages.map( s => s._2 )
>>
>>   //Create a broadcast variable - straight from the dev guide
>>   val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))
>>
>>   //Try to print out the value of the broadcast var here
>>   val transformed = events.transform(rdd => {
>>     rdd.map(x => {
>>       if(broadcastVar == null) {
>>         println("broadcastVar is null")
>>       }  else {
>>         println("broadcastVar value: " + broadcastVar.value.mkString("|"))
>>       }
>>       x
>>     })
>>   })
>>
>>   transformed.foreachRDD(x => logger.info("Data: " +
>> x.collect.mkString("|")))
>>
>>   ssc.start()
>>   ssc.awaitTermination()
>> }
>>
>> Any input is very much appreciated!
>>
>> Regards,
>> Dmitry.
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com>.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to