Re: broadcast hang out
It would be great to add a timeout. Do you mind submitting a pull request? On Sun, Mar 15, 2015 at 10:41 PM, lonely Feb wrote: > Anyone can help? Thanks a lot ! > > 2015-03-16 11:45 GMT+08:00 lonely Feb : > > > yes > > > > 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan : > > > >> Cross region as in different data centers ? > >> > >> - Mridul > >> > >> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb > wrote: > >> > Hi all, i meet up with a problem that torrent broadcast hang out in my > >> > spark cluster (1.2, standalone) , particularly serious when driver and > >> > executors are cross-region. when i read the code of broadcast i found > >> that > >> > a sync block read here: > >> > > >> > def fetchBlockSync(host: String, port: Int, execId: String, blockId: > >> > String): ManagedBuffer = { > >> > // A monitor for the thread to wait on. > >> > val result = Promise[ManagedBuffer]() > >> > fetchBlocks(host, port, execId, Array(blockId), > >> > new BlockFetchingListener { > >> > override def onBlockFetchFailure(blockId: String, exception: > >> > Throwable): Unit = { > >> > result.failure(exception) > >> > } > >> > override def onBlockFetchSuccess(blockId: String, data: > >> > ManagedBuffer): Unit = { > >> > val ret = ByteBuffer.allocate(data.size.toInt) > >> > ret.put(data.nioByteBuffer()) > >> > ret.flip() > >> > result.success(new NioManagedBuffer(ret)) > >> > } > >> > }) > >> > > >> > Await.result(result.future, Duration.Inf) > >> > } > >> > > >> > it seems that fetchBlockSync method does not have a timeout limit but > >> wait > >> > forever ? Anybody can show me how to control the timeout here? > >> > > > > >
Re: broadcast hang out
Anyone can help? Thanks a lot ! 2015-03-16 11:45 GMT+08:00 lonely Feb : > yes > > 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan : > >> Cross region as in different data centers ? >> >> - Mridul >> >> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: >> > Hi all, i meet up with a problem that torrent broadcast hang out in my >> > spark cluster (1.2, standalone) , particularly serious when driver and >> > executors are cross-region. when i read the code of broadcast i found >> that >> > a sync block read here: >> > >> > def fetchBlockSync(host: String, port: Int, execId: String, blockId: >> > String): ManagedBuffer = { >> > // A monitor for the thread to wait on. >> > val result = Promise[ManagedBuffer]() >> > fetchBlocks(host, port, execId, Array(blockId), >> > new BlockFetchingListener { >> > override def onBlockFetchFailure(blockId: String, exception: >> > Throwable): Unit = { >> > result.failure(exception) >> > } >> > override def onBlockFetchSuccess(blockId: String, data: >> > ManagedBuffer): Unit = { >> > val ret = ByteBuffer.allocate(data.size.toInt) >> > ret.put(data.nioByteBuffer()) >> > ret.flip() >> > result.success(new NioManagedBuffer(ret)) >> > } >> > }) >> > >> > Await.result(result.future, Duration.Inf) >> > } >> > >> > it seems that fetchBlockSync method does not have a timeout limit but >> wait >> > forever ? Anybody can show me how to control the timeout here? >> > >
Re: broadcast hang out
yes 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan : > Cross region as in different data centers ? > > - Mridul > > On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: > > Hi all, i meet up with a problem that torrent broadcast hang out in my > > spark cluster (1.2, standalone) , particularly serious when driver and > > executors are cross-region. when i read the code of broadcast i found > that > > a sync block read here: > > > > def fetchBlockSync(host: String, port: Int, execId: String, blockId: > > String): ManagedBuffer = { > > // A monitor for the thread to wait on. > > val result = Promise[ManagedBuffer]() > > fetchBlocks(host, port, execId, Array(blockId), > > new BlockFetchingListener { > > override def onBlockFetchFailure(blockId: String, exception: > > Throwable): Unit = { > > result.failure(exception) > > } > > override def onBlockFetchSuccess(blockId: String, data: > > ManagedBuffer): Unit = { > > val ret = ByteBuffer.allocate(data.size.toInt) > > ret.put(data.nioByteBuffer()) > > ret.flip() > > result.success(new NioManagedBuffer(ret)) > > } > > }) > > > > Await.result(result.future, Duration.Inf) > > } > > > > it seems that fetchBlockSync method does not have a timeout limit but > wait > > forever ? Anybody can show me how to control the timeout here? >
Re: broadcast hang out
Cross region as in different data centers ? - Mridul On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: > Hi all, i meet up with a problem that torrent broadcast hang out in my > spark cluster (1.2, standalone) , particularly serious when driver and > executors are cross-region. when i read the code of broadcast i found that > a sync block read here: > > def fetchBlockSync(host: String, port: Int, execId: String, blockId: > String): ManagedBuffer = { > // A monitor for the thread to wait on. > val result = Promise[ManagedBuffer]() > fetchBlocks(host, port, execId, Array(blockId), > new BlockFetchingListener { > override def onBlockFetchFailure(blockId: String, exception: > Throwable): Unit = { > result.failure(exception) > } > override def onBlockFetchSuccess(blockId: String, data: > ManagedBuffer): Unit = { > val ret = ByteBuffer.allocate(data.size.toInt) > ret.put(data.nioByteBuffer()) > ret.flip() > result.success(new NioManagedBuffer(ret)) > } > }) > > Await.result(result.future, Duration.Inf) > } > > it seems that fetchBlockSync method does not have a timeout limit but wait > forever ? Anybody can show me how to control the timeout here? - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: broadcast hang out
Thx. But this method is in BlockTransferService.scala of spark which i can not replace unless i rewrite the core code. I wonder if it is handled somewhere already. 2015-03-16 11:27 GMT+08:00 Chester Chen : > can you just replace "Duration.Inf" with a shorter duration ? how about > > import scala.concurrent.duration._ > val timeout = new Timeout(10 seconds) > Await.result(result.future, timeout.duration) > > or > > val timeout = new FiniteDuration(10, TimeUnit.SECONDS) > Await.result(result.future, timeout) > > or simply > import scala.concurrent.duration._ > Await.result(result.future, 10 seconds) > > > > On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: > >> Hi all, i meet up with a problem that torrent broadcast hang out in my >> spark cluster (1.2, standalone) , particularly serious when driver and >> executors are cross-region. when i read the code of broadcast i found that >> a sync block read here: >> >> def fetchBlockSync(host: String, port: Int, execId: String, blockId: >> String): ManagedBuffer = { >> // A monitor for the thread to wait on. >> val result = Promise[ManagedBuffer]() >> fetchBlocks(host, port, execId, Array(blockId), >> new BlockFetchingListener { >> override def onBlockFetchFailure(blockId: String, exception: >> Throwable): Unit = { >> result.failure(exception) >> } >> override def onBlockFetchSuccess(blockId: String, data: >> ManagedBuffer): Unit = { >> val ret = ByteBuffer.allocate(data.size.toInt) >> ret.put(data.nioByteBuffer()) >> ret.flip() >> result.success(new NioManagedBuffer(ret)) >> } >> }) >> >> Await.result(result.future, Duration.Inf) >> } >> >> it seems that fetchBlockSync method does not have a timeout limit but wait >> forever ? Anybody can show me how to control the timeout here? >> > >
Re: broadcast hang out
can you just replace "Duration.Inf" with a shorter duration ? how about import scala.concurrent.duration._ val timeout = new Timeout(10 seconds) Await.result(result.future, timeout.duration) or val timeout = new FiniteDuration(10, TimeUnit.SECONDS) Await.result(result.future, timeout) or simply import scala.concurrent.duration._ Await.result(result.future, 10 seconds) On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb wrote: > Hi all, i meet up with a problem that torrent broadcast hang out in my > spark cluster (1.2, standalone) , particularly serious when driver and > executors are cross-region. when i read the code of broadcast i found that > a sync block read here: > > def fetchBlockSync(host: String, port: Int, execId: String, blockId: > String): ManagedBuffer = { > // A monitor for the thread to wait on. > val result = Promise[ManagedBuffer]() > fetchBlocks(host, port, execId, Array(blockId), > new BlockFetchingListener { > override def onBlockFetchFailure(blockId: String, exception: > Throwable): Unit = { > result.failure(exception) > } > override def onBlockFetchSuccess(blockId: String, data: > ManagedBuffer): Unit = { > val ret = ByteBuffer.allocate(data.size.toInt) > ret.put(data.nioByteBuffer()) > ret.flip() > result.success(new NioManagedBuffer(ret)) > } > }) > > Await.result(result.future, Duration.Inf) > } > > it seems that fetchBlockSync method does not have a timeout limit but wait > forever ? Anybody can show me how to control the timeout here? >
broadcast hang out
Hi all, i meet up with a problem that torrent broadcast hang out in my spark cluster (1.2, standalone) , particularly serious when driver and executors are cross-region. when i read the code of broadcast i found that a sync block read here: def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) Await.result(result.future, Duration.Inf) } it seems that fetchBlockSync method does not have a timeout limit but wait forever ? Anybody can show me how to control the timeout here?