Github user vanzin commented on a diff in the pull request:
https://github.com/apache/spark/pull/9530#discussion_r44352760
--- Diff: core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
---
@@ -302,6 +325,105 @@ private[netty] class NettyRpcEnv(
}
}
+ override def fileServer: RpcEnvFileServer = streamManager
+
+ override def openChannel(uri: String): ReadableByteChannel = {
+ val parsedUri = new URI(uri)
+ require(parsedUri.getHost() != null, "Host name must be defined.")
+ require(parsedUri.getPort() > 0, "Port must be defined.")
+ require(parsedUri.getPath() != null && parsedUri.getPath().nonEmpty,
"Path must be defined.")
+
+ val pipe = Pipe.open()
+ val source = new FileDownloadChannel(pipe.source())
+ try {
+ val callback = new FileDownloadCallback(pipe.sink(), source)
+ val client = fileDownloadClient(parsedUri.getHost(),
parsedUri.getPort())
+ client.stream(parsedUri.getPath(), callback)
+ } catch {
+ case e: Exception =>
+ pipe.sink().close()
+ source.close()
+ throw e
+ }
+
+ source
+ }
+
+ private def fileDownloadClient(host: String, port: Int): TransportClient
= synchronized {
+ if (stopped.get()) {
+ throw new IllegalStateException("RpcEnv already stopped.")
+ }
+
+ val address = RpcAddress(host, port)
+ fileClients.get(address).filter(_.isActive()).getOrElse {
+ // Create a new client and install a handler that will respond to
IdleStateEvent. The events
+ // are generated by the IdleStateHandler installed by the
TransportContext when creating
+ // clients, and the timeout value is controlled by the transport
configuration.
+ val c = clientFactory.createUnmanagedClient(host, port)
+ c.getChannel().pipeline().addLast("rpcEnvTimeoutHandler", new
TimeoutHandler(c))
+ fileClients.put(address, c)
+ c
+ }
+ }
+
+ private class TimeoutHandler(client: TransportClient) extends
ChannelInboundHandlerAdapter {
+
+ override def userEventTriggered(ctx: ChannelHandlerContext, evt:
Object): Unit = {
--- End diff --
So, this is a little bit racy, since the timeout might trigger when another
thread is preparing to download a file. I'll fix this and update the PR.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]