juliuszsompolski commented on a change in pull request #28738:
URL: https://github.com/apache/spark/pull/28738#discussion_r435990515
##########
File path:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
##########
@@ -1006,338 +417,12 @@ class HiveThriftCleanUpScratchDirSuite extends
HiveThriftJdbcTest{
}
}
- override protected def afterAll(): Unit = {
+ override def afterAll(): Unit = {
Utils.deleteRecursively(tempScratchDir)
super.afterAll()
}
}
-class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
- override def mode: ServerMode.Value = ServerMode.http
-
- test("JDBC query execution") {
- withJdbcStatement("test") { statement =>
- val queries = Seq(
- "SET spark.sql.shuffle.partitions=3",
- "CREATE TABLE test(key INT, val STRING) USING hive",
- s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE
test",
- "CACHE TABLE test")
-
- queries.foreach(statement.execute)
-
- assertResult(5, "Row count mismatch") {
- val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test")
- resultSet.next()
- resultSet.getInt(1)
- }
- }
- }
-
- test("Checks Hive version") {
- withJdbcStatement() { statement =>
- val resultSet = statement.executeQuery("SET spark.sql.hive.version")
- resultSet.next()
- assert(resultSet.getString(1) === "spark.sql.hive.version")
- assert(resultSet.getString(2) === HiveUtils.builtinHiveVersion)
- }
- }
-
- test("SPARK-24829 Checks cast as float") {
- withJdbcStatement() { statement =>
- val resultSet = statement.executeQuery("SELECT CAST('4.56' AS FLOAT)")
- resultSet.next()
- assert(resultSet.getString(1) === "4.56")
- }
- }
-}
-
object ServerMode extends Enumeration {
val binary, http = Value
}
-
-abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
- Utils.classForName(classOf[HiveDriver].getCanonicalName)
-
- private def jdbcUri = if (mode == ServerMode.http) {
- s"""jdbc:hive2://localhost:$serverPort/
- |default?
- |hive.server2.transport.mode=http;
- |hive.server2.thrift.http.path=cliservice;
- |${hiveConfList}#${hiveVarList}
- """.stripMargin.split("\n").mkString.trim
- } else {
- s"jdbc:hive2://localhost:$serverPort/?${hiveConfList}#${hiveVarList}"
- }
-
- def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement
=> Unit)*): Unit = {
- val user = System.getProperty("user.name")
- val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user,
"") }
- val statements = connections.map(_.createStatement())
-
- try {
- statements.zip(fs).foreach { case (s, f) => f(s) }
- } finally {
- tableNames.foreach { name =>
- // TODO: Need a better way to drop the view.
- if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
- statements(0).execute(s"DROP VIEW IF EXISTS $name")
- } else {
- statements(0).execute(s"DROP TABLE IF EXISTS $name")
- }
- }
- statements.foreach(_.close())
- connections.foreach(_.close())
- }
- }
-
- def withDatabase(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
- val user = System.getProperty("user.name")
- val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user,
"") }
- val statements = connections.map(_.createStatement())
-
- try {
- statements.zip(fs).foreach { case (s, f) => f(s) }
- } finally {
- dbNames.foreach { name =>
- statements(0).execute(s"DROP DATABASE IF EXISTS $name")
- }
- statements.foreach(_.close())
- connections.foreach(_.close())
- }
- }
-
- def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
- withMultipleConnectionJdbcStatement(tableNames: _*)(f)
- }
-}
-
-abstract class HiveThriftServer2Test extends SparkFunSuite with
BeforeAndAfterAll with Logging {
- def mode: ServerMode.Value
-
- private val CLASS_NAME =
HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")
- private val LOG_FILE_MARK = s"starting $CLASS_NAME, logging to "
-
- protected val startScript =
"../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
- protected val stopScript =
"../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
-
- private var listeningPort: Int = _
- protected def serverPort: Int = listeningPort
-
- protected val hiveConfList = "a=avalue;b=bvalue"
- protected val hiveVarList = "c=cvalue;d=dvalue"
- protected def user = System.getProperty("user.name")
-
- protected var warehousePath: File = _
- protected var metastorePath: File = _
- protected def metastoreJdbcUri =
s"jdbc:derby:;databaseName=$metastorePath;create=true"
-
- private val pidDir: File = Utils.createTempDir(namePrefix =
"thriftserver-pid")
- protected var logPath: File = _
- protected var operationLogPath: File = _
- protected var lScratchDir: File = _
- private var logTailingProcess: Process = _
- private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
-
- protected def extraConf: Seq[String] = Nil
-
- protected def serverStartCommand(port: Int) = {
- val portConf = if (mode == ServerMode.binary) {
- ConfVars.HIVE_SERVER2_THRIFT_PORT
- } else {
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT
- }
-
- val driverClassPath = {
- // Writes a temporary log4j.properties and prepend it to driver
classpath, so that it
- // overrides all other potential log4j configurations contained in other
dependency jar files.
- val tempLog4jConf = Utils.createTempDir().getCanonicalPath
-
- Files.write(
- """log4j.rootCategory=DEBUG, console
- |log4j.appender.console=org.apache.log4j.ConsoleAppender
- |log4j.appender.console.target=System.err
- |log4j.appender.console.layout=org.apache.log4j.PatternLayout
- |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd
HH:mm:ss} %p %c{1}: %m%n
- """.stripMargin,
- new File(s"$tempLog4jConf/log4j.properties"),
- StandardCharsets.UTF_8)
-
- tempLog4jConf
- }
-
- s"""$startScript
- | --master local
- | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
- | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
- | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
- | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
- | --hiveconf
${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
- | --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
- | --hiveconf $portConf=$port
- | --driver-class-path $driverClassPath
- | --driver-java-options -Dlog4j.debug
- | --conf spark.ui.enabled=false
- | ${extraConf.mkString("\n")}
- """.stripMargin.split("\\s+").toSeq
- }
-
- /**
- * String to scan for when looking for the thrift binary endpoint running.
- * This can change across Hive versions.
- */
- val THRIFT_BINARY_SERVICE_LIVE = "Starting ThriftBinaryCLIService on port"
-
- /**
- * String to scan for when looking for the thrift HTTP endpoint running.
- * This can change across Hive versions.
- */
- val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http"
Review comment:
If this becomes flaky because of the port, you could set port 0, and
turn the use of these lines into a regex that would parse the port that got
assigned...
##########
File path:
sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
##########
@@ -1006,338 +417,12 @@ class HiveThriftCleanUpScratchDirSuite extends
HiveThriftJdbcTest{
}
}
- override protected def afterAll(): Unit = {
+ override def afterAll(): Unit = {
Utils.deleteRecursively(tempScratchDir)
super.afterAll()
}
}
-class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
- override def mode: ServerMode.Value = ServerMode.http
-
- test("JDBC query execution") {
- withJdbcStatement("test") { statement =>
- val queries = Seq(
- "SET spark.sql.shuffle.partitions=3",
- "CREATE TABLE test(key INT, val STRING) USING hive",
- s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE
test",
- "CACHE TABLE test")
-
- queries.foreach(statement.execute)
-
- assertResult(5, "Row count mismatch") {
- val resultSet = statement.executeQuery("SELECT COUNT(*) FROM test")
- resultSet.next()
- resultSet.getInt(1)
- }
- }
- }
-
- test("Checks Hive version") {
- withJdbcStatement() { statement =>
- val resultSet = statement.executeQuery("SET spark.sql.hive.version")
- resultSet.next()
- assert(resultSet.getString(1) === "spark.sql.hive.version")
- assert(resultSet.getString(2) === HiveUtils.builtinHiveVersion)
- }
- }
-
- test("SPARK-24829 Checks cast as float") {
- withJdbcStatement() { statement =>
- val resultSet = statement.executeQuery("SELECT CAST('4.56' AS FLOAT)")
- resultSet.next()
- assert(resultSet.getString(1) === "4.56")
- }
- }
-}
-
object ServerMode extends Enumeration {
val binary, http = Value
}
-
-abstract class HiveThriftJdbcTest extends HiveThriftServer2Test {
- Utils.classForName(classOf[HiveDriver].getCanonicalName)
-
- private def jdbcUri = if (mode == ServerMode.http) {
- s"""jdbc:hive2://localhost:$serverPort/
- |default?
- |hive.server2.transport.mode=http;
- |hive.server2.thrift.http.path=cliservice;
- |${hiveConfList}#${hiveVarList}
- """.stripMargin.split("\n").mkString.trim
- } else {
- s"jdbc:hive2://localhost:$serverPort/?${hiveConfList}#${hiveVarList}"
- }
-
- def withMultipleConnectionJdbcStatement(tableNames: String*)(fs: (Statement
=> Unit)*): Unit = {
- val user = System.getProperty("user.name")
- val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user,
"") }
- val statements = connections.map(_.createStatement())
-
- try {
- statements.zip(fs).foreach { case (s, f) => f(s) }
- } finally {
- tableNames.foreach { name =>
- // TODO: Need a better way to drop the view.
- if (name.toUpperCase(Locale.ROOT).startsWith("VIEW")) {
- statements(0).execute(s"DROP VIEW IF EXISTS $name")
- } else {
- statements(0).execute(s"DROP TABLE IF EXISTS $name")
- }
- }
- statements.foreach(_.close())
- connections.foreach(_.close())
- }
- }
-
- def withDatabase(dbNames: String*)(fs: (Statement => Unit)*): Unit = {
- val user = System.getProperty("user.name")
- val connections = fs.map { _ => DriverManager.getConnection(jdbcUri, user,
"") }
- val statements = connections.map(_.createStatement())
-
- try {
- statements.zip(fs).foreach { case (s, f) => f(s) }
- } finally {
- dbNames.foreach { name =>
- statements(0).execute(s"DROP DATABASE IF EXISTS $name")
- }
- statements.foreach(_.close())
- connections.foreach(_.close())
- }
- }
-
- def withJdbcStatement(tableNames: String*)(f: Statement => Unit): Unit = {
- withMultipleConnectionJdbcStatement(tableNames: _*)(f)
- }
-}
-
-abstract class HiveThriftServer2Test extends SparkFunSuite with
BeforeAndAfterAll with Logging {
- def mode: ServerMode.Value
-
- private val CLASS_NAME =
HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")
- private val LOG_FILE_MARK = s"starting $CLASS_NAME, logging to "
-
- protected val startScript =
"../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
- protected val stopScript =
"../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
-
- private var listeningPort: Int = _
- protected def serverPort: Int = listeningPort
-
- protected val hiveConfList = "a=avalue;b=bvalue"
- protected val hiveVarList = "c=cvalue;d=dvalue"
- protected def user = System.getProperty("user.name")
-
- protected var warehousePath: File = _
- protected var metastorePath: File = _
- protected def metastoreJdbcUri =
s"jdbc:derby:;databaseName=$metastorePath;create=true"
-
- private val pidDir: File = Utils.createTempDir(namePrefix =
"thriftserver-pid")
- protected var logPath: File = _
- protected var operationLogPath: File = _
- protected var lScratchDir: File = _
- private var logTailingProcess: Process = _
- private var diagnosisBuffer: ArrayBuffer[String] = ArrayBuffer.empty[String]
-
- protected def extraConf: Seq[String] = Nil
-
- protected def serverStartCommand(port: Int) = {
- val portConf = if (mode == ServerMode.binary) {
- ConfVars.HIVE_SERVER2_THRIFT_PORT
- } else {
- ConfVars.HIVE_SERVER2_THRIFT_HTTP_PORT
- }
-
- val driverClassPath = {
- // Writes a temporary log4j.properties and prepend it to driver
classpath, so that it
- // overrides all other potential log4j configurations contained in other
dependency jar files.
- val tempLog4jConf = Utils.createTempDir().getCanonicalPath
-
- Files.write(
- """log4j.rootCategory=DEBUG, console
- |log4j.appender.console=org.apache.log4j.ConsoleAppender
- |log4j.appender.console.target=System.err
- |log4j.appender.console.layout=org.apache.log4j.PatternLayout
- |log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd
HH:mm:ss} %p %c{1}: %m%n
- """.stripMargin,
- new File(s"$tempLog4jConf/log4j.properties"),
- StandardCharsets.UTF_8)
-
- tempLog4jConf
- }
-
- s"""$startScript
- | --master local
- | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
- | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath
- | --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_BIND_HOST}=localhost
- | --hiveconf ${ConfVars.HIVE_SERVER2_TRANSPORT_MODE}=$mode
- | --hiveconf
${ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}=$operationLogPath
- | --hiveconf ${ConfVars.LOCALSCRATCHDIR}=$lScratchDir
- | --hiveconf $portConf=$port
- | --driver-class-path $driverClassPath
- | --driver-java-options -Dlog4j.debug
- | --conf spark.ui.enabled=false
- | ${extraConf.mkString("\n")}
- """.stripMargin.split("\\s+").toSeq
- }
-
- /**
- * String to scan for when looking for the thrift binary endpoint running.
- * This can change across Hive versions.
- */
- val THRIFT_BINARY_SERVICE_LIVE = "Starting ThriftBinaryCLIService on port"
-
- /**
- * String to scan for when looking for the thrift HTTP endpoint running.
- * This can change across Hive versions.
- */
- val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http"
-
- val SERVER_STARTUP_TIMEOUT = 3.minutes
-
- private def startThriftServer(port: Int, attempt: Int) = {
- warehousePath = Utils.createTempDir()
- warehousePath.delete()
- metastorePath = Utils.createTempDir()
- metastorePath.delete()
- operationLogPath = Utils.createTempDir()
- operationLogPath.delete()
- lScratchDir = Utils.createTempDir()
- lScratchDir.delete()
- logPath = null
- logTailingProcess = null
-
- val command = serverStartCommand(port)
-
- diagnosisBuffer ++=
- s"""
- |### Attempt $attempt ###
- |HiveThriftServer2 command line: $command
- |Listening port: $port
- |System user: $user
- """.stripMargin.split("\n")
-
- logInfo(s"Trying to start HiveThriftServer2: port=$port, mode=$mode,
attempt=$attempt")
-
- logPath = {
- val lines = Utils.executeAndGetOutput(
- command = command,
- extraEnvironment = Map(
- // Disables SPARK_TESTING to exclude log4j.properties in test
directories.
- "SPARK_TESTING" -> "0",
- // But set SPARK_SQL_TESTING to make spark-class happy.
- "SPARK_SQL_TESTING" -> "1",
- // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift
server instance can be
- // started at a time, which is not Jenkins friendly.
- "SPARK_PID_DIR" -> pidDir.getCanonicalPath),
- redirectStderr = true)
-
- logInfo(s"COMMAND: $command")
- logInfo(s"OUTPUT: $lines")
- lines.split("\n").collectFirst {
- case line if line.contains(LOG_FILE_MARK) => new
File(line.drop(LOG_FILE_MARK.length))
- }.getOrElse {
- throw new RuntimeException("Failed to find HiveThriftServer2 log
file.")
- }
- }
-
- val serverStarted = Promise[Unit]()
-
- // Ensures that the following "tail" command won't fail.
- logPath.createNewFile()
- val successLines = Seq(THRIFT_BINARY_SERVICE_LIVE,
THRIFT_HTTP_SERVICE_LIVE)
-
- logTailingProcess = {
- val command = s"/usr/bin/env tail -n +0 -f
${logPath.getCanonicalPath}".split(" ")
- // Using "-n +0" to make sure all lines in the log file are checked.
- val builder = new ProcessBuilder(command: _*)
- val captureOutput = (line: String) => diagnosisBuffer.synchronized {
- diagnosisBuffer += line
-
- successLines.foreach { r =>
- if (line.contains(r)) {
- serverStarted.trySuccess(())
- }
- }
Review comment:
... by parsing for the port here instead of the line.contains.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]