This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 11f9fb8d3ab HIVE-27646: Iceberg: Retry the query when concurrent write fails due to conflicting changes (Simhadri Govindappa, reviewed by Denys Kuzmenko, @suenalaba) 11f9fb8d3ab is described below commit 11f9fb8d3abfcf03205b94dd88da14fd66ff40dc Author: Simhadri Govindappa <simhadri...@gmail.com> AuthorDate: Fri Sep 22 16:01:50 2023 +0530 HIVE-27646: Iceberg: Retry the query when concurrent write fails due to conflicting changes (Simhadri Govindappa, reviewed by Denys Kuzmenko, @suenalaba) Closes #4629 --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +- .../HiveIcebergStorageHandlerWithEngineBase.java | 6 + .../apache/iceberg/mr/hive/TestHiveIcebergV2.java | 8 ++ .../iceberg/mr/hive/TestOptimisticRetry.java | 148 +++++++++++++++++++++ .../org/apache/hadoop/hive/ql/DriverFactory.java | 28 +--- .../ql/reexec/ReExecuteOnWriteConflictPlugin.java | 72 ++++++++++ .../hive/ql/reexec/ReExecutionStrategyType.java | 44 ++++++ 7 files changed, 288 insertions(+), 23 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index d4d6429c46d..5e9c8425ddf 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -5605,12 +5605,13 @@ public class HiveConf extends Configuration { HIVE_QUERY_REEXECUTION_ENABLED("hive.query.reexecution.enabled", true, "Enable query reexecutions"), HIVE_QUERY_REEXECUTION_STRATEGIES("hive.query.reexecution.strategies", - "overlay,reoptimize,reexecute_lost_am,dagsubmit,recompile_without_cbo", + "overlay,reoptimize,reexecute_lost_am,dagsubmit,recompile_without_cbo,write_conflict", "comma separated list of plugin can be used:\n" + " overlay: hiveconf subtree 'reexec.overlay' is used as an overlay in case of an execution errors out\n" + " reoptimize: collects operator statistics during execution and recompile the query after a failure\n" + " recompile_without_cbo: recompiles query after a CBO failure\n" - + " reexecute_lost_am: reexecutes query if it failed due to tez am node gets decommissioned"), + + " reexecute_lost_am: reexecutes query if it failed due to tez am node gets decommissioned\n " + + " write_conflict: retries the query once if the query failed due to write_conflict"), HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope", "metastore", new StringSet("query", "hiveserver", "metastore"), "Sets the persistence scope of runtime statistics\n" diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java index 8653c0db02f..8d2e94e72f7 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java @@ -62,6 +62,12 @@ public abstract class HiveIcebergStorageHandlerWithEngineBase { protected static final String[] EXECUTION_ENGINES = new String[] {"tez"}; + public static final String RETRY_STRATEGIES = + "overlay,reoptimize,reexecute_lost_am,dagsubmit,recompile_without_cbo,write_conflict"; + + public static final String RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT = + "overlay,reoptimize,reexecute_lost_am," + "dagsubmit,recompile_without_cbo"; + protected static final Schema ORDER_SCHEMA = new Schema( required(1, "order_id", Types.LongType.get()), required(2, "customer_id", Types.LongType.get()), diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java index 3f474cfe79a..47c5e8251d7 100644 --- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java @@ -667,6 +667,8 @@ public class TestHiveIcebergV2 extends HiveIcebergStorageHandlerWithEngineBase { init(shell, testTables, temp, executionEngine); HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES, + RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT); shell.executeStatement(sql); shell.closeSession(); }); @@ -689,6 +691,8 @@ public class TestHiveIcebergV2 extends HiveIcebergStorageHandlerWithEngineBase { init(shell, testTables, temp, executionEngine); HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES, + RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT); shell.executeStatement(sql); shell.closeSession(); }); @@ -721,6 +725,8 @@ public class TestHiveIcebergV2 extends HiveIcebergStorageHandlerWithEngineBase { init(shell, testTables, temp, executionEngine); HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES, + RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT); shell.executeStatement(sql[i]); shell.closeSession(); }); @@ -753,6 +759,8 @@ public class TestHiveIcebergV2 extends HiveIcebergStorageHandlerWithEngineBase { init(shell, testTables, temp, executionEngine); HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES, + RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT); shell.executeStatement(sql); shell.closeSession(); }); diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java new file mode 100644 index 00000000000..305364085b5 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.iceberg.mr.hive; + +import java.util.List; +import java.util.concurrent.Executors; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.relocated.com.google.common.base.Throwables; +import org.apache.iceberg.util.Tasks; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import static org.apache.iceberg.mr.hive.HiveIcebergStorageHandlerTestUtils.init; + +public class TestOptimisticRetry extends HiveIcebergStorageHandlerWithEngineBase { + + @Test + public void testConcurrentOverlappingUpdates() { + + Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized && + testTableType == TestTables.TestTableType.HIVE_CATALOG); + + testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2); + String sql = "UPDATE customers SET last_name='Changed' WHERE customer_id=3 or first_name='Joanna'"; + + try { + Tasks.range(2) + .executeWith(Executors.newFixedThreadPool(2)) + .run(i -> { + init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES, + RETRY_STRATEGIES); + shell.executeStatement(sql); + shell.closeSession(); + }); + } catch (Throwable ex) { + // If retry succeeds then it should not throw an ValidationException. + Throwable cause = Throwables.getRootCause(ex); + if (cause instanceof ValidationException && cause.getMessage().matches("^Found.*conflicting.*files(.*)")) { + Assert.fail(); + } + } + + List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE last_name='Changed'"); + Assert.assertEquals(5, res.size()); + + } + + + @Test + public void testNonOverlappingConcurrent2Updates() { + + Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized && + testTableType == TestTables.TestTableType.HIVE_CATALOG); + + testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2); + String[] sql = new String[]{"UPDATE customers SET last_name='Changed' WHERE customer_id=3 or first_name='Joanna'", + "UPDATE customers SET last_name='Changed2' WHERE customer_id=2 and first_name='Jake'"}; + + try { + Tasks.range(2) + .executeWith(Executors.newFixedThreadPool(2)) + .run(i -> { + init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES, + RETRY_STRATEGIES); + shell.executeStatement(sql[i]); + shell.closeSession(); + }); + } catch (Throwable ex) { + // If retry succeeds then it should not throw an ValidationException. + Throwable cause = Throwables.getRootCause(ex); + if (cause instanceof ValidationException && cause.getMessage().matches("^Found.*conflicting.*files(.*)")) { + Assert.fail(); + } + } + + List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE last_name='Changed'"); + Assert.assertEquals(5, res.size()); + + res = shell.executeStatement("SELECT * FROM customers WHERE last_name='Changed2'"); + Assert.assertEquals(1, res.size()); + } + + @Test + public void testConcurrent2MergeInserts() { + Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized && + testTableType == TestTables.TestTableType.HIVE_CATALOG); + + testTables.createTable(shell, "source", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1); + testTables.createTable(shell, "target", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, + PartitionSpec.unpartitioned(), fileFormat, HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2); + + String sql = "MERGE INTO target t USING source s on t.customer_id = s.customer_id WHEN Not MATCHED THEN " + + "INSERT values (s.customer_id, s.first_name, s.last_name)"; + try { + Tasks.range(2) + .executeWith(Executors.newFixedThreadPool(2)) + .run(i -> { + init(shell, testTables, temp, executionEngine); + HiveConf.setBoolVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none"); + HiveConf.setVar(shell.getHiveConf(), HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES, + RETRY_STRATEGIES); + shell.executeStatement(sql); + shell.closeSession(); + }); + } catch (Throwable ex) { + // If retry succeeds then it should not throw an ValidationException. + Throwable cause = Throwables.getRootCause(ex); + if (cause instanceof ValidationException && cause.getMessage().matches("^Found.*conflicting.*files(.*)")) { + Assert.fail(); + } + } + List<Object[]> res = shell.executeStatement("SELECT * FROM target"); + Assert.assertEquals(6, res.size()); + } + +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java index 3cedff97f66..25357da6566 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java @@ -25,11 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.reexec.IReExecutionPlugin; import org.apache.hadoop.hive.ql.reexec.ReExecDriver; -import org.apache.hadoop.hive.ql.reexec.ReExecuteLostAMQueryPlugin; -import org.apache.hadoop.hive.ql.reexec.ReCompileWithoutCBOPlugin; -import org.apache.hadoop.hive.ql.reexec.ReExecutionOverlayPlugin; -import org.apache.hadoop.hive.ql.reexec.ReExecutionDagSubmitPlugin; -import org.apache.hadoop.hive.ql.reexec.ReOptimizePlugin; +import org.apache.hadoop.hive.ql.reexec.ReExecutionStrategyType; import com.google.common.base.Strings; @@ -66,23 +62,13 @@ public final class DriverFactory { } private static IReExecutionPlugin buildReExecPlugin(String name) throws RuntimeException { - if ("overlay".equals(name)) { - return new ReExecutionOverlayPlugin(); + Class<? extends IReExecutionPlugin> pluginType = ReExecutionStrategyType.getPluginClassByName(name); + try { + return pluginType.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException( + "Unknown re-execution plugin: " + name + " (" + ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES.varname + ")"); } - if ("reoptimize".equals(name)) { - return new ReOptimizePlugin(); - } - if ("reexecute_lost_am".equals(name)) { - return new ReExecuteLostAMQueryPlugin(); - } - if ("recompile_without_cbo".equals(name)) { - return new ReCompileWithoutCBOPlugin(); - } - if (name.equals("dagsubmit")) { - return new ReExecutionDagSubmitPlugin(); - } - throw new RuntimeException( - "Unknown re-execution plugin: " + name + " (" + ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES.varname + ")"); } public static QueryState getNewQueryState(HiveConf conf) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java new file mode 100644 index 00000000000..78c24eee915 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.reexec; + +import com.google.common.base.Throwables; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext; +import org.apache.hadoop.hive.ql.hooks.HookContext; +import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Pattern; + +public class ReExecuteOnWriteConflictPlugin implements IReExecutionPlugin { + private static final Logger LOG = LoggerFactory.getLogger(ReExecuteOnWriteConflictPlugin.class); + private static boolean retryPossible; + + private static final Pattern writeConflictErrorPattern = Pattern.compile("^Found.*conflicting.*files(.*)"); + private static final String validationException = "org.apache.iceberg.exceptions.ValidationException"; + + private static final class LocalHook implements ExecuteWithHookContext { + @Override + public void run(HookContext hookContext) throws Exception { + if (hookContext.getHookType() == HookContext.HookType.ON_FAILURE_HOOK) { + Throwable exception = hookContext.getException(); + + if (exception != null && exception.getMessage() != null) { + Throwable cause = Throwables.getRootCause(exception); + + if (cause.getClass().getName().equals(validationException) && + cause.getMessage().matches(writeConflictErrorPattern.pattern())) { + retryPossible = true; + LOG.info("Retrying query due to write conflict."); + } + LOG.info("Got exception message: {} retryPossible: {}", exception.getMessage(), retryPossible); + } + } + } + } + + @Override + public void initialize(Driver driver) { + driver.getHookRunner().addOnFailureHook(new ReExecuteOnWriteConflictPlugin.LocalHook()); + } + + @Override + public boolean shouldReExecute(int executionNum) { + return retryPossible; + } + + @Override + public boolean shouldReExecuteAfterCompile(int executionNum, PlanMapper oldPlanMapper, PlanMapper newPlanMapper) { + return retryPossible; + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionStrategyType.java b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionStrategyType.java new file mode 100644 index 00000000000..9f50acc71ed --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionStrategyType.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.reexec; + +import java.util.Optional; + +public enum ReExecutionStrategyType { + OVERLAY(ReExecutionOverlayPlugin.class), + REOPTIMIZE(ReOptimizePlugin.class), + REEXECUTE_LOST_AM(ReExecuteLostAMQueryPlugin.class), + RECOMPILE_WITHOUT_CBO(ReCompileWithoutCBOPlugin.class), + DAGSUBMIT(ReExecutionDagSubmitPlugin.class), + WRITE_CONFLICT(ReExecuteOnWriteConflictPlugin.class); + + ReExecutionStrategyType(Class<? extends IReExecutionPlugin> pluginClass) { + this.pluginClass = pluginClass; + } + + private final Class<? extends IReExecutionPlugin> pluginClass; + + public Class<? extends IReExecutionPlugin> getPluginClass() { + return pluginClass; + } + + public static Class<? extends IReExecutionPlugin> getPluginClassByName(String strategy) { + return ReExecutionStrategyType.valueOf(strategy.toUpperCase()).getPluginClass(); + } +}