sarutak commented on code in PR #52099: URL: https://github.com/apache/spark/pull/52099#discussion_r2332249440
########## dev/test-dependencies.sh: ########## @@ -34,7 +34,7 @@ HADOOP_MODULE_PROFILES="-Phive-thriftserver -Pkubernetes -Pyarn -Phive \ -Pspark-ganglia-lgpl -Pkinesis-asl -Phadoop-cloud -Pjvm-profiler" MVN="build/mvn" HADOOP_HIVE_PROFILES=( - hadoop-3-hive-2.3 + hadoop-3-hive-4.0 Review Comment: `4.1`? ########## pom.xml: ########## @@ -206,7 +207,7 @@ <jsr305.version>3.0.0</jsr305.version> <jaxb.version>2.2.11</jaxb.version> <libthrift.version>0.16.0</libthrift.version> - <antlr4.version>4.13.1</antlr4.version> + <antlr4.version>4.9.3</antlr4.version> Review Comment: Basically, we don't allow downgrading dependencies as @dongjoon-hyun mentioned. If there is dependency conflict between Spark and Hive mentioned [here](https://github.com/apache/spark/pull/52099#issuecomment-3238073241), we should discuss first how to resolve it first. Also, please don't make this kind of change without explanation. ########## repl/src/test/scala/org/apache/spark/repl/SparkShellSuite.scala: ########## @@ -95,6 +95,25 @@ class SparkShellSuite extends SparkFunSuite { } } + def handleException(cause: Throwable): Unit = lock.synchronized { + val message = Review Comment: Is this change related to Hive upgrade? ########## project/SparkBuild.scala: ########## @@ -1750,6 +1760,7 @@ object TestSettings { }.getOrElse(Nil): _*), // Show full stack trace and duration in test cases. (Test / testOptions) += Tests.Argument("-oDF"), + (Test / testOptions) += Tests.Argument(TestFrameworks.ScalaTest, "-fG", "scalatest.txt"), Review Comment: Why is this change needed for upgrading Hive? ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala: ########## @@ -854,7 +851,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // Get the original table properties as defined by the user. table.copy( createVersion = version, - properties = table.properties.filterNot { case (key, _) => key.startsWith(SPARK_SQL_PREFIX) }) + properties = table.properties.filterNot { case (key, value) => + key.startsWith(SPARK_SQL_PREFIX) || + key == "bucketing_version" && value == "2" || Review Comment: Why do we need new conditions, and are there any corresponding tests? ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala: ########## @@ -77,7 +77,7 @@ private[spark] object HiveUtils extends Logging { "<code>2.0.0</code> through <code>2.3.10</code>, " + "<code>3.0.0</code> through <code>3.1.3</code> and " + "<code>4.0.0</code> through <code>4.1.0</code>.") - .version("1.4.0") + .version("4.1.0") Review Comment: ditto. ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: ########## @@ -110,7 +110,7 @@ class HadoopTableReader( */ def makeRDDForTable( hiveTable: HiveTable, - deserializerClass: Class[_ <: Deserializer], + abstractSerDeClass: Class[_ <: AbstractSerDe], Review Comment: I don't think we need to change the name because `deserializer` represents its usage. ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: ########## @@ -226,13 +227,10 @@ class HadoopTableReader( case (key, value) => props.setProperty(key, value) } DeserializerLock.synchronized { - deserializer.initialize(hconf, props) + deserializer.initialize(hconf, props, partProps) Review Comment: Why do we need pass `partProps` as third parameter while `null` is passed to other `initialize`. ########## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala: ########## @@ -22,6 +22,6 @@ private[client] trait HiveClientVersions { protected val versions = if (testVersions.nonEmpty) { testVersions.get.split(",").map(_.trim).filter(_.nonEmpty).toIndexedSeq } else { - IndexedSeq("2.0", "2.1", "2.2", "2.3", "3.0", "3.1", "4.0", "4.1") Review Comment: Why should we drop tests with old clients? Upgrading builtin Hive and supported metastore version should be discussed separately. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala: ########## @@ -4722,8 +4722,7 @@ class AstBuilder extends DataTypeAstBuilder entry("field.delim", ctx.fieldsTerminatedBy) ++ entry("serialization.format", ctx.fieldsTerminatedBy) ++ entry("escape.delim", ctx.escapedBy) ++ - // The following typo is inherited from Hive... - entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ + entry("collection.delim", ctx.collectionItemsTerminatedBy) ++ Review Comment: This change seems to break compatibility so I think we need to note this change if this change is really needed. ########## pom.xml: ########## @@ -336,6 +337,7 @@ -Djdk.reflect.useDirectMethodHandle=false -Dio.netty.tryReflectionSetAccessible=true --enable-native-access=ALL-UNNAMED + -Dmvn.executable=${maven.multiModuleProjectDirectory}/build/mvn Review Comment: Why is this change needed? ########## dev/test-jars.txt: ########## @@ -10,7 +10,6 @@ sql/connect/common/src/test/resources/artifact-tests/smallJar.jar sql/core/src/test/resources/SPARK-33084.jar sql/core/src/test/resources/artifact-tests/udf_noA.jar sql/hive-thriftserver/src/test/resources/TestUDTF.jar -sql/hive/src/test/noclasspath/hive-test-udfs.jar Review Comment: Why this and related change are needed? If there is a reason, it should be explained in the PR description. ########## dev/test-dependencies.sh: ########## @@ -85,7 +85,7 @@ $MVN -q versions:set -DnewVersion=$TEMP_VERSION -DgenerateBackupPoms=false > /de # Generate manifests for each Hadoop profile: for HADOOP_HIVE_PROFILE in "${HADOOP_HIVE_PROFILES[@]}"; do - if [[ $HADOOP_HIVE_PROFILE == **hadoop-3-hive-2.3** ]]; then + if [[ $HADOOP_HIVE_PROFILE == **hadoop-3-hive-4.0** ]]; then Review Comment: ditto. ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala: ########## @@ -62,7 +62,7 @@ private[spark] object HiveUtils extends Logging { " Note that, this a read-only conf and only used to report the built-in hive version." + " If you want a different metastore client for Spark to call, please refer to" + " spark.sql.hive.metastore.version.") - .version("1.1.1") + .version("4.1.0") Review Comment: This part should not be changed. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcSourceSuite.scala: ########## @@ -508,7 +508,14 @@ abstract class OrcSuite withAllNativeOrcReaders { checkAnswer( spark.read.orc(path), - Seq(Row(Date.valueOf("1001-01-01")), Row(Date.valueOf("1582-10-15")))) + Seq(Row(Date.valueOf("1001-01-01")), + if (spark.isInstanceOf[TestSparkSession]) { + // Spark rebases 1582-10-05 through 1582-10-15 to 1582-10-15 + Row(Date.valueOf("1582-10-15")) + } else { + // Hive rebases 1582-10-05 through 1582-10-15 by adding 10 days + Row(Date.valueOf("1582-10-20")) Review Comment: This part seems to break compatibility right? We should discuss if this kind of change is acceptable or not. ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ########## @@ -1224,8 +1204,9 @@ private[hive] object HiveClientImpl extends Logging { p: CatalogTablePartition, ht: HiveTable): HivePartition = { val tpart = new org.apache.hadoop.hive.metastore.api.Partition + val spec = new CaseInsensitiveStringMap(p.spec.asJava).asScala.view val partValues = ht.getPartCols.asScala.map { hc => - p.spec.getOrElse(hc.getName, throw new IllegalArgumentException( + spec.getOrElse(hc.getName, throw new IllegalArgumentException( Review Comment: Why is this change needed. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DaysWritableV2.scala: ########## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.{DataInput, DataOutput, IOException} + +import org.apache.hadoop.hive.common.`type`.Date +import org.apache.hadoop.hive.serde2.io.DateWritableV2 + +import org.apache.spark.sql.catalyst.util.RebaseDateTime.{rebaseGregorianToJulianDays, rebaseJulianToGregorianDays} + +/** + * The class accepts/returns days in Gregorian calendar and rebase them + * via conversion to local date in Julian calendar for dates before 1582-10-15 + * in read/write for backward compatibility with Spark 2.4 and earlier versions. + * + * @param gregorianDays The number of days since the epoch 1970-01-01 in + * Gregorian calendar. + * @param julianDays The number of days since the epoch 1970-01-01 in + * Julian calendar. + */ +class DaysWritableV2 extends DateWritableV2 { Review Comment: This code seems used from nowhere. ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala: ########## @@ -284,7 +288,7 @@ object HiveScriptIOSchema extends HiveInspectors { val properties = new Properties() properties.putAll(propsMap.asJava) - serde.initialize(null, properties) + serde.initialize(hadoopConf, properties, null) Review Comment: `serde.initialize(null, properties, null)`? ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala: ########## @@ -311,7 +316,9 @@ private[orc] class OrcOutputWriter( // Hive ORC initializes its private `writer` field at the first write. // For empty write task, we need to create it manually to record our meta. val options = OrcFile.writerOptions(context.getConfiguration) - options.inspector(serializer.structOI) + .inspector(serializer.structOI) + // .setProlepticGregorian(true) + // .useUTCTimestamp(true) Review Comment: ditto. ########## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala: ########## @@ -35,9 +35,8 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored") hadoopConf.set("hive.metastore.schema.verification", "false") // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. - if (version == "3.0" || version == "3.1" || version == "4.0") { + if (version == "3.0" || version == "3.1" || version == "4.1") { Review Comment: Why is `4.0` excluded? ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala: ########## @@ -535,6 +520,7 @@ private[client] class Shim_v2_0 extends Shim with Logging { override def alterTable(hive: Hive, tableName: String, table: Table): Unit = { recordHiveCall() alterTableMethod.invoke(hive, tableName, table) + // hive.alterTable(tableName, table, ) Review Comment: ? ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala: ########## @@ -133,19 +133,19 @@ class HadoopTableReader( val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter => val hconf = broadcastedHadoopConf.value.value - val deserializer = deserializerClass.getConstructor().newInstance() + val abstractSerDe = abstractSerDeClass.getConstructor().newInstance() Review Comment: ditto. ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala: ########## @@ -291,6 +295,7 @@ private[orc] class OrcOutputWriter( } override def write(row: InternalRow): Unit = { + // getOrCreateInternalWriter() Review Comment: Why do we need to leave this comment? ########## sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveVersionSuite.scala: ########## @@ -35,9 +35,8 @@ private[client] abstract class HiveVersionSuite(version: String) extends SparkFu hadoopConf.set("datanucleus.autoStartMechanismMode", "ignored") hadoopConf.set("hive.metastore.schema.verification", "false") // Since Hive 3.0, HIVE-19310 skipped `ensureDbInit` if `hive.in.test=false`. - if (version == "3.0" || version == "3.1" || version == "4.0") { + if (version == "3.0" || version == "3.1" || version == "4.1") { hadoopConf.set("hive.in.test", "true") - hadoopConf.set("hive.query.reexecution.enabled", "false") Review Comment: Why is this change needed? ########## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala: ########## @@ -211,8 +211,6 @@ private[client] sealed abstract class Shim { def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] - def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit Review Comment: This kind of change seems to break compatibility. Did you test that old hive metastores work with this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org