yaooqinn commented on a change in pull request #28751:
URL: https://github.com/apache/spark/pull/28751#discussion_r436649612
##########
File path:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
##########
@@ -74,18 +86,31 @@ trait SharedThriftServer extends SharedSparkSession {
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any
free port to use.
// It's much more robust than set a random port generated by ourselves
ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
- hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
- hiveServer2.getServices.asScala.foreach {
- case t: ThriftCLIService if t.getPortNumber != 0 =>
- serverPort = t.getPortNumber
- logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
- case _ =>
- }
+ // Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick
any free port to use.
+ // It's much more robust than set a random port generated by ourselves
ahead
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
mode.toString)
+
+ try {
+ hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
+ hiveServer2.getServices.asScala.foreach {
+ case t: ThriftCLIService if t.getPortNumber != 0 =>
+ serverPort = t.getPortNumber
+ logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
Review comment:
Before this fix, yes. The port binding is in another background thread.
##########
File path:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
##########
@@ -74,18 +86,31 @@ trait SharedThriftServer extends SharedSparkSession {
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any
free port to use.
// It's much more robust than set a random port generated by ourselves
ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
- hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
- hiveServer2.getServices.asScala.foreach {
- case t: ThriftCLIService if t.getPortNumber != 0 =>
- serverPort = t.getPortNumber
- logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
- case _ =>
- }
+ // Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick
any free port to use.
+ // It's much more robust than set a random port generated by ourselves
ahead
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
mode.toString)
+
+ try {
+ hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
+ hiveServer2.getServices.asScala.foreach {
+ case t: ThriftCLIService if t.getPortNumber != 0 =>
+ serverPort = t.getPortNumber
+ logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
Review comment:
With
https://github.com/apache/spark/pull/28751/files#diff-7610697b4f8f1bc4842c77e50807914cR178
and its implementations, the port binding is done in the same thread where we
call `getPortNumber` later.
##########
File path:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
##########
@@ -74,18 +86,31 @@ trait SharedThriftServer extends SharedSparkSession {
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any
free port to use.
// It's much more robust than set a random port generated by ourselves
ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
- hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
- hiveServer2.getServices.asScala.foreach {
- case t: ThriftCLIService if t.getPortNumber != 0 =>
- serverPort = t.getPortNumber
- logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
- case _ =>
- }
+ // Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick
any free port to use.
+ // It's much more robust than set a random port generated by ourselves
ahead
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
mode.toString)
+
+ try {
+ hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
+ hiveServer2.getServices.asScala.foreach {
+ case t: ThriftCLIService if t.getPortNumber != 0 =>
+ serverPort = t.getPortNumber
+ logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
Review comment:
https://github.com/apache/spark/pull/28651#discussion_r435932239 there
is a discussion with @juliuszsompolski before
##########
File path:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
##########
@@ -74,18 +86,31 @@ trait SharedThriftServer extends SharedSparkSession {
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any
free port to use.
// It's much more robust than set a random port generated by ourselves
ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
- hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
- hiveServer2.getServices.asScala.foreach {
- case t: ThriftCLIService if t.getPortNumber != 0 =>
- serverPort = t.getPortNumber
- logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
- case _ =>
- }
+ // Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick
any free port to use.
+ // It's much more robust than set a random port generated by ourselves
ahead
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
mode.toString)
+
+ try {
+ hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
+ hiveServer2.getServices.asScala.foreach {
+ case t: ThriftCLIService if t.getPortNumber != 0 =>
+ serverPort = t.getPortNumber
+ logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
Review comment:
https://github.com/apache/spark/pull/28651#discussion_r435932239 . there
was a discussion with @juliuszsompolski before
##########
File path:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/SharedThriftServer.scala
##########
@@ -74,18 +86,31 @@ trait SharedThriftServer extends SharedSparkSession {
// Set the HIVE_SERVER2_THRIFT_PORT to 0, so it could randomly pick any
free port to use.
// It's much more robust than set a random port generated by ourselves
ahead
sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_PORT.varname, "0")
- hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
- hiveServer2.getServices.asScala.foreach {
- case t: ThriftCLIService if t.getPortNumber != 0 =>
- serverPort = t.getPortNumber
- logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
- case _ =>
- }
+ // Set the HIVE_SERVER2_THRIFT_HTTP_PORT to 0, so it could randomly pick
any free port to use.
+ // It's much more robust than set a random port generated by ourselves
ahead
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT.varname, "0")
+ sqlContext.setConf(ConfVars.HIVE_SERVER2_TRANSPORT_MODE.varname,
mode.toString)
+
+ try {
+ hiveServer2 = HiveThriftServer2.startWithContext(sqlContext)
+ hiveServer2.getServices.asScala.foreach {
+ case t: ThriftCLIService if t.getPortNumber != 0 =>
+ serverPort = t.getPortNumber
+ logInfo(s"Started HiveThriftServer2: port=$serverPort,
attempt=$attempt")
Review comment:
Take `ThriftBinaryCLIService` for an example
Before:
we do `TThreadPoolServer` initialization and `serve` in the same `run`
function of the background thread. Then if we call getPortNumber right after
`startWithContext`, concurrency issue will occur. The `portNum` may not reset
yet when we call.
After:
we do `TThreadPoolServer` initialization in the current thread and do
`serve` in the `run` function of the background thread.
##########
File path:
sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
##########
@@ -45,13 +46,13 @@ public ThriftBinaryCLIService(CLIService cliService) {
}
@Override
- public void run() {
+ protected void initializeServer() {
try {
// Server thread pool
String threadPoolName = "HiveServer2-Handler-Pool";
ExecutorService executorService = new
ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
- workerKeepAliveTime, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>(),
- new ThreadFactoryWithGarbageCleanup(threadPoolName));
+ workerKeepAliveTime, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
Review comment:
OK
##########
File path:
sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
##########
@@ -97,14 +98,20 @@ public void run() {
// TCP Server
server = new TThreadPoolServer(sargs);
server.setServerEventHandler(serverEventHandler);
- String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName()
+ " on port "
- + serverSocket.getServerSocket().getLocalPort() + " with " +
minWorkerThreads + "..." + maxWorkerThreads + " worker threads";
+ String msg = "Starting " + getName() + " on port " + portNum + " with "
+ minWorkerThreads +
+ "..." + maxWorkerThreads + " worker threads";
Review comment:
unnecessary change, you are right. reverted
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]