[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user kmanamcheri commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r221107390 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => -// SPARK-18167 retry to investigate the flaky test. This should be reverted before -// the release is cut. -val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) -logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) -logError("all partitions: " + getAllPartitions(hive, table)) -throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + --- End diff -- @mallman Your assumption is incorrect. If Hive on direct sql fails, it will retry with ORM. So in this case, I am able to reproduce a issue with postgres where direct sql fails and if it retries with ORM, spark fails! Hive has fallback behavior for direct sql. Filed SPARK-25561 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r216111838 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => -// SPARK-18167 retry to investigate the flaky test. This should be reverted before -// the release is cut. -val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) -logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) -logError("all partitions: " + getAllPartitions(hive, table)) -throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + --- End diff -- Thank you very much for the explanation @mallman. I appreciate it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r216037341 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => -// SPARK-18167 retry to investigate the flaky test. This should be reverted before -// the release is cut. -val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) -logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) -logError("all partitions: " + getAllPartitions(hive, table)) -throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + --- End diff -- Hi @rezasafi I believe the reasoning is if the user has disabled direct sql, we will try to fetch the partitions for the requested partition predicate anyway. However, since we don't expect that call to succeed, we just log a warning and fallback to the legacy behavior. On the other hand, if the user has enabled direct sql, then we expect the call to Hive to succeed. If it fails, we consider that an error and throw an exception. I hope that helps clarify things. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r211370552 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false getPartitionsByFilterMethod.invoke(hive, table, filter) .asInstanceOf[JArrayList[Partition]] } catch { - case e: InvocationTargetException => -// SPARK-18167 retry to investigate the flaky test. This should be reverted before -// the release is cut. -val retry = Try(getPartitionsByFilterMethod.invoke(hive, table, filter)) -logError("getPartitionsByFilter failed, retry success = " + retry.isSuccess) -logError("all partitions: " + getAllPartitions(hive, table)) -throw e + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Modifying your Hive metastore configuration to set " + + s"${tryDirectSqlConfVar.varname} to true may resolve this problem.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + --- End diff -- @mallman sorry to disturb you here, but what is the reason that when direct sql isn't set only a warning is logged?and why when direct sql is set a runtime exception is being raised instead of just a warning like no direct sql case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/15673 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r85865327 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -585,7 +586,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) +try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false + getPartitionsByFilterMethod.invoke(hive, table, filter) +.asInstanceOf[JArrayList[Partition]] +} catch { + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Consider modifying your Hive metastore configuration to " + + s"set ${tryDirectSqlConfVar.varname} to true.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + + "metadata by filter from Hive. Set the Spark configuration setting " + --- End diff -- I made some revisions. LMK what you think. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r85864458 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -585,7 +586,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) +try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false + getPartitionsByFilterMethod.invoke(hive, table, filter) +.asInstanceOf[JArrayList[Partition]] +} catch { + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Consider modifying your Hive metastore configuration to " + + s"set ${tryDirectSqlConfVar.varname} to true.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + + "metadata by filter from Hive. Set the Spark configuration setting " + --- End diff -- Good point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user ericl commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r85856641 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -585,7 +586,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] +val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL +val tryDirectSql = + hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal) +try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false + getPartitionsByFilterMethod.invoke(hive, table, filter) +.asInstanceOf[JArrayList[Partition]] +} catch { + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + !tryDirectSql => +logWarning("Caught Hive MetaException attempting to get partition metadata by " + + "filter from Hive. Falling back to fetching all partition metadata, which will " + + "degrade performance. Consider modifying your Hive metastore configuration to " + + s"set ${tryDirectSqlConfVar.varname} to true.", ex) +// HiveShim clients are expected to handle a superset of the requested partitions +getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] && + tryDirectSql => +throw new RuntimeException("Caught Hive MetaException attempting to get partition " + + "metadata by filter from Hive. Set the Spark configuration setting " + --- End diff -- You probably want word it to suggest disabling partition management as a workaround only. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/15673#discussion_r85633937 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -585,7 +585,18 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]] } else { logDebug(s"Hive metastore filter is '$filter'.") -getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]] +try { + // Hive may throw an exception when calling this method in some circumstances, such as + // when filtering on a non-string partition column when the hive config key + // hive.metastore.try.direct.sql is false + getPartitionsByFilterMethod.invoke(hive, table, filter) +.asInstanceOf[JArrayList[Partition]] +} catch { + case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] => +logWarning("Caught MetaException attempting to get partitions by filter from Hive", ex) --- End diff -- change the msg to say we are falling back to fetch all partitions' medatadata? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15673: [SPARK-17992][SQL] Return all partitions from Hiv...
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/15673 [SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter (Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992) ## What changes were proposed in this pull request? We recently added table partition pruning for partitioned Hive tables converted to using `TableFileCatalog`. When the Hive configuration option `hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception for unsupported filter expressions. For example, attempting to filter on an integer partition column will throw a `org.apache.hadoop.hive.metastore.api.MetaException`. I discovered this behavior because VideoAmp uses the CDH version of Hive with a Postgresql metastore DB. In this configuration, CDH sets `hive.metastore.try.direct.sql` to `false` by default, and queries that filter on a non-string partition column will fail. Rather than throw an exception in query planning, this patch catches this exception, logs a warning and returns all table partitions instead. Clients of this method are already expected to handle the possibility that the filters will not be honored. ## How was this patch tested? A unit test was added. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VideoAmp/spark-public spark-17992-catch_hive_partition_filter_exception Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/15673.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #15673 commit c62beda38fe16fb6fe391e125605cf6df90d1e57 Author: Michael Allman Date: 2016-10-28T17:35:39Z [SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org