Thanks for the reply!

Theoretically I should be able to do as you suggest as I follow the pool design 
pattern from the documentation, but I don’t seem to be able to run any code 
after .stop() is called.

  override def main(args: Array[String]) {
    // setup
    val ssc = new StreamingContext(sparkConf, Seconds(streamTime))
    val inputStreams = (1 to numReceivers).map(i => ssc.receiverStream(<custom 
receiver>))
    val messages = ssc.union(inputStreams)

    messages.foreachRDD { rdd =>
      rdd.foreachPartition { p =>
        val indexer = Indexer.getInstance()

        p.foreach(Indexer.process(_) match {
          case Some(entry) => indexer.index(entry)
          case None =>
        })

        Indexer.returnInstance(indexer)
      }
    }

    messages.print()

    sys.ShutdownHookThread {
      logInfo("****************** Shutdown hook triggered ******************")
      ssc.stop(false, true)
      logInfo("****************** Shutdown finished ******************")
      ssc.stop(true)
    }

    ssc.start()
    ssc.awaitTermination()
  }

The first shutdown log message is always displayed, but the second message 
never does. I’ve tried multiple permutations of the stop function calls and 
even used try/catch around it. I’m running in yarn-cluster mode using Spark 1.2 
on CDH 5.3. I stop the application with yarn application -kill <appID>.


From: Tathagata Das [mailto:t...@databricks.com]
Sent: Thursday, March 12, 2015 1:29 PM
To: Jose Fernandez
Cc: user@spark.apache.org
Subject: Re: Handling worker batch processing during driver shutdown

Can you access the batcher directly? Like is there is there a handle to get 
access to the batchers on the executors by running a task on that executor? If 
so, after the streamingContext has been stopped (not the SparkContext), then 
you can use `sc.makeRDD()` to run a dummy task like this.

sc.makeRDD(1 to 1000, 1000).foreach { x =>
   Batcher.get().flush()
}

With large number of tasks and no other jobs running in the system, at least 
one task will run in each executor and therefore will flush the batcher.

TD

On Thu, Mar 12, 2015 at 12:27 PM, Jose Fernandez 
<jfernan...@sdl.com<mailto:jfernan...@sdl.com>> wrote:
Hi folks,

I have a shutdown hook in my driver which stops the streaming context cleanly. 
This is great as workers can finish their current processing unit before 
shutting down. Unfortunately each worker contains a batch processor which only 
flushes every X entries. We’re indexing to different indices in elasticsearch 
and using the bulk index request for performance. As far as Spark is concerned, 
once data is added to the batcher it is considered processed, so our workers 
are being shut down with data still in the batcher.

Is there any way to coordinate the shutdown with the workers? I haven’t had any 
luck searching for a solution online. I would appreciate any suggestions you 
may have.

Thanks :)


 [http://www.sdl.com/Content/images/Innovate_2015_400.png] 
<http://www.sdl.com/innovate/sanfran>


SDL PLC confidential, all rights reserved. If you are not the intended 
recipient of this mail SDL requests and requires that you delete it without 
acting upon or copying any of its contents, and we further request that you 
advise us.

SDL PLC is a public limited company registered in England and Wales. Registered 
number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, 
UK.




This message has been scanned for malware by Websense. 
www.websense.com<http://www.websense.com/>



Click 
here<https://www.mailcontrol.com/sr/XpJmGbjdkhnGX2PQPOmvUq10po4Wab0lISUyc4KGaMSKtJYdrvwK9eA2sGsCtyo!dlcWvDplrZSU8yB5sLY89Q==>
 to report this email as spam.

 [http://www.sdl.com/Content/images/Innovate_2015_400.png] 
<www.sdl.com/innovate/sanfran>

SDL PLC confidential, all rights reserved. If you are not the intended 
recipient of this mail SDL requests and requires that you delete it without 
acting upon or copying any of its contents, and we further request that you 
advise us.

SDL PLC is a public limited company registered in England and Wales. Registered 
number: 02675207.
Registered address: Globe House, Clivemont Road, Maidenhead, Berkshire SL6 7DY, 
UK.

Reply via email to