Re: broadcast hang out

2015-03-16 Thread Reynold Xin
It would be great to add a timeout. Do you mind submitting a pull request?


On Sun, Mar 15, 2015 at 10:41 PM, lonely Feb  wrote:

> Anyone can help? Thanks a lot !
>
> 2015-03-16 11:45 GMT+08:00 lonely Feb :
>
> > yes
> >
> > 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan :
> >
> >> Cross region as in different data centers ?
> >>
> >> - Mridul
> >>
> >> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb 
> wrote:
> >> > Hi all, i meet up with a problem that torrent broadcast hang out in my
> >> > spark cluster (1.2, standalone) , particularly serious when driver and
> >> > executors are cross-region. when i read the code of broadcast i found
> >> that
> >> > a sync block read here:
> >> >
> >> >   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> >> > String): ManagedBuffer = {
> >> > // A monitor for the thread to wait on.
> >> > val result = Promise[ManagedBuffer]()
> >> > fetchBlocks(host, port, execId, Array(blockId),
> >> >   new BlockFetchingListener {
> >> > override def onBlockFetchFailure(blockId: String, exception:
> >> > Throwable): Unit = {
> >> >   result.failure(exception)
> >> > }
> >> > override def onBlockFetchSuccess(blockId: String, data:
> >> > ManagedBuffer): Unit = {
> >> >   val ret = ByteBuffer.allocate(data.size.toInt)
> >> >   ret.put(data.nioByteBuffer())
> >> >   ret.flip()
> >> >   result.success(new NioManagedBuffer(ret))
> >> > }
> >> >   })
> >> >
> >> > Await.result(result.future, Duration.Inf)
> >> >   }
> >> >
> >> > it seems that fetchBlockSync method does not have a timeout limit but
> >> wait
> >> > forever ? Anybody can show me how to control the timeout here?
> >>
> >
> >
>


Re: broadcast hang out

2015-03-15 Thread lonely Feb
Anyone can help? Thanks a lot !

2015-03-16 11:45 GMT+08:00 lonely Feb :

> yes
>
> 2015-03-16 11:43 GMT+08:00 Mridul Muralidharan :
>
>> Cross region as in different data centers ?
>>
>> - Mridul
>>
>> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb  wrote:
>> > Hi all, i meet up with a problem that torrent broadcast hang out in my
>> > spark cluster (1.2, standalone) , particularly serious when driver and
>> > executors are cross-region. when i read the code of broadcast i found
>> that
>> > a sync block read here:
>> >
>> >   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
>> > String): ManagedBuffer = {
>> > // A monitor for the thread to wait on.
>> > val result = Promise[ManagedBuffer]()
>> > fetchBlocks(host, port, execId, Array(blockId),
>> >   new BlockFetchingListener {
>> > override def onBlockFetchFailure(blockId: String, exception:
>> > Throwable): Unit = {
>> >   result.failure(exception)
>> > }
>> > override def onBlockFetchSuccess(blockId: String, data:
>> > ManagedBuffer): Unit = {
>> >   val ret = ByteBuffer.allocate(data.size.toInt)
>> >   ret.put(data.nioByteBuffer())
>> >   ret.flip()
>> >   result.success(new NioManagedBuffer(ret))
>> > }
>> >   })
>> >
>> > Await.result(result.future, Duration.Inf)
>> >   }
>> >
>> > it seems that fetchBlockSync method does not have a timeout limit but
>> wait
>> > forever ? Anybody can show me how to control the timeout here?
>>
>
>


Re: broadcast hang out

2015-03-15 Thread lonely Feb
yes

2015-03-16 11:43 GMT+08:00 Mridul Muralidharan :

> Cross region as in different data centers ?
>
> - Mridul
>
> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb  wrote:
> > Hi all, i meet up with a problem that torrent broadcast hang out in my
> > spark cluster (1.2, standalone) , particularly serious when driver and
> > executors are cross-region. when i read the code of broadcast i found
> that
> > a sync block read here:
> >
> >   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> > String): ManagedBuffer = {
> > // A monitor for the thread to wait on.
> > val result = Promise[ManagedBuffer]()
> > fetchBlocks(host, port, execId, Array(blockId),
> >   new BlockFetchingListener {
> > override def onBlockFetchFailure(blockId: String, exception:
> > Throwable): Unit = {
> >   result.failure(exception)
> > }
> > override def onBlockFetchSuccess(blockId: String, data:
> > ManagedBuffer): Unit = {
> >   val ret = ByteBuffer.allocate(data.size.toInt)
> >   ret.put(data.nioByteBuffer())
> >   ret.flip()
> >   result.success(new NioManagedBuffer(ret))
> > }
> >   })
> >
> > Await.result(result.future, Duration.Inf)
> >   }
> >
> > it seems that fetchBlockSync method does not have a timeout limit but
> wait
> > forever ? Anybody can show me how to control the timeout here?
>


Re: broadcast hang out

2015-03-15 Thread Mridul Muralidharan
Cross region as in different data centers ?

- Mridul

On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb  wrote:
> Hi all, i meet up with a problem that torrent broadcast hang out in my
> spark cluster (1.2, standalone) , particularly serious when driver and
> executors are cross-region. when i read the code of broadcast i found that
> a sync block read here:
>
>   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> String): ManagedBuffer = {
> // A monitor for the thread to wait on.
> val result = Promise[ManagedBuffer]()
> fetchBlocks(host, port, execId, Array(blockId),
>   new BlockFetchingListener {
> override def onBlockFetchFailure(blockId: String, exception:
> Throwable): Unit = {
>   result.failure(exception)
> }
> override def onBlockFetchSuccess(blockId: String, data:
> ManagedBuffer): Unit = {
>   val ret = ByteBuffer.allocate(data.size.toInt)
>   ret.put(data.nioByteBuffer())
>   ret.flip()
>   result.success(new NioManagedBuffer(ret))
> }
>   })
>
> Await.result(result.future, Duration.Inf)
>   }
>
> it seems that fetchBlockSync method does not have a timeout limit but wait
> forever ? Anybody can show me how to control the timeout here?

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: broadcast hang out

2015-03-15 Thread lonely Feb
Thx. But this method is in BlockTransferService.scala of spark which i can
not replace unless i rewrite the core code. I wonder if it is handled
somewhere already.

2015-03-16 11:27 GMT+08:00 Chester Chen :

> can you just replace "Duration.Inf" with a shorter duration  ? how about
>
>   import scala.concurrent.duration._
>   val timeout = new Timeout(10 seconds)
>   Await.result(result.future, timeout.duration)
>
>   or
>
>   val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
>   Await.result(result.future, timeout)
>
>   or simply
>   import scala.concurrent.duration._
>   Await.result(result.future, 10 seconds)
>
>
>
> On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb  wrote:
>
>> Hi all, i meet up with a problem that torrent broadcast hang out in my
>> spark cluster (1.2, standalone) , particularly serious when driver and
>> executors are cross-region. when i read the code of broadcast i found that
>> a sync block read here:
>>
>>   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
>> String): ManagedBuffer = {
>> // A monitor for the thread to wait on.
>> val result = Promise[ManagedBuffer]()
>> fetchBlocks(host, port, execId, Array(blockId),
>>   new BlockFetchingListener {
>> override def onBlockFetchFailure(blockId: String, exception:
>> Throwable): Unit = {
>>   result.failure(exception)
>> }
>> override def onBlockFetchSuccess(blockId: String, data:
>> ManagedBuffer): Unit = {
>>   val ret = ByteBuffer.allocate(data.size.toInt)
>>   ret.put(data.nioByteBuffer())
>>   ret.flip()
>>   result.success(new NioManagedBuffer(ret))
>> }
>>   })
>>
>> Await.result(result.future, Duration.Inf)
>>   }
>>
>> it seems that fetchBlockSync method does not have a timeout limit but wait
>> forever ? Anybody can show me how to control the timeout here?
>>
>
>


Re: broadcast hang out

2015-03-15 Thread Chester Chen
can you just replace "Duration.Inf" with a shorter duration  ? how about

  import scala.concurrent.duration._
  val timeout = new Timeout(10 seconds)
  Await.result(result.future, timeout.duration)

  or

  val timeout = new FiniteDuration(10, TimeUnit.SECONDS)
  Await.result(result.future, timeout)

  or simply
  import scala.concurrent.duration._
  Await.result(result.future, 10 seconds)



On Sun, Mar 15, 2015 at 8:08 PM, lonely Feb  wrote:

> Hi all, i meet up with a problem that torrent broadcast hang out in my
> spark cluster (1.2, standalone) , particularly serious when driver and
> executors are cross-region. when i read the code of broadcast i found that
> a sync block read here:
>
>   def fetchBlockSync(host: String, port: Int, execId: String, blockId:
> String): ManagedBuffer = {
> // A monitor for the thread to wait on.
> val result = Promise[ManagedBuffer]()
> fetchBlocks(host, port, execId, Array(blockId),
>   new BlockFetchingListener {
> override def onBlockFetchFailure(blockId: String, exception:
> Throwable): Unit = {
>   result.failure(exception)
> }
> override def onBlockFetchSuccess(blockId: String, data:
> ManagedBuffer): Unit = {
>   val ret = ByteBuffer.allocate(data.size.toInt)
>   ret.put(data.nioByteBuffer())
>   ret.flip()
>   result.success(new NioManagedBuffer(ret))
> }
>   })
>
> Await.result(result.future, Duration.Inf)
>   }
>
> it seems that fetchBlockSync method does not have a timeout limit but wait
> forever ? Anybody can show me how to control the timeout here?
>


broadcast hang out

2015-03-15 Thread lonely Feb
Hi all, i meet up with a problem that torrent broadcast hang out in my
spark cluster (1.2, standalone) , particularly serious when driver and
executors are cross-region. when i read the code of broadcast i found that
a sync block read here:

  def fetchBlockSync(host: String, port: Int, execId: String, blockId:
String): ManagedBuffer = {
// A monitor for the thread to wait on.
val result = Promise[ManagedBuffer]()
fetchBlocks(host, port, execId, Array(blockId),
  new BlockFetchingListener {
override def onBlockFetchFailure(blockId: String, exception:
Throwable): Unit = {
  result.failure(exception)
}
override def onBlockFetchSuccess(blockId: String, data:
ManagedBuffer): Unit = {
  val ret = ByteBuffer.allocate(data.size.toInt)
  ret.put(data.nioByteBuffer())
  ret.flip()
  result.success(new NioManagedBuffer(ret))
}
  })

Await.result(result.future, Duration.Inf)
  }

it seems that fetchBlockSync method does not have a timeout limit but wait
forever ? Anybody can show me how to control the timeout here?