This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f9fb8d3ab HIVE-27646: Iceberg: Retry the query when concurrent write 
fails due to conflicting changes (Simhadri Govindappa, reviewed by  Denys 
Kuzmenko, @suenalaba)
11f9fb8d3ab is described below

commit 11f9fb8d3abfcf03205b94dd88da14fd66ff40dc
Author: Simhadri Govindappa <simhadri...@gmail.com>
AuthorDate: Fri Sep 22 16:01:50 2023 +0530

    HIVE-27646: Iceberg: Retry the query when concurrent write fails due to 
conflicting changes (Simhadri Govindappa, reviewed by  Denys Kuzmenko, 
@suenalaba)
    
    Closes #4629
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +-
 .../HiveIcebergStorageHandlerWithEngineBase.java   |   6 +
 .../apache/iceberg/mr/hive/TestHiveIcebergV2.java  |   8 ++
 .../iceberg/mr/hive/TestOptimisticRetry.java       | 148 +++++++++++++++++++++
 .../org/apache/hadoop/hive/ql/DriverFactory.java   |  28 +---
 .../ql/reexec/ReExecuteOnWriteConflictPlugin.java  |  72 ++++++++++
 .../hive/ql/reexec/ReExecutionStrategyType.java    |  44 ++++++
 7 files changed, 288 insertions(+), 23 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d4d6429c46d..5e9c8425ddf 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -5605,12 +5605,13 @@ public class HiveConf extends Configuration {
     HIVE_QUERY_REEXECUTION_ENABLED("hive.query.reexecution.enabled", true,
         "Enable query reexecutions"),
     HIVE_QUERY_REEXECUTION_STRATEGIES("hive.query.reexecution.strategies",
-        "overlay,reoptimize,reexecute_lost_am,dagsubmit,recompile_without_cbo",
+        
"overlay,reoptimize,reexecute_lost_am,dagsubmit,recompile_without_cbo,write_conflict",
         "comma separated list of plugin can be used:\n"
             + "  overlay: hiveconf subtree 'reexec.overlay' is used as an 
overlay in case of an execution errors out\n"
             + "  reoptimize: collects operator statistics during execution and 
recompile the query after a failure\n"
             + "  recompile_without_cbo: recompiles query after a CBO failure\n"
-            + "  reexecute_lost_am: reexecutes query if it failed due to tez 
am node gets decommissioned"),
+            + "  reexecute_lost_am: reexecutes query if it failed due to tez 
am node gets decommissioned\n "
+            + "  write_conflict: retries the query once if the query failed 
due to write_conflict"),
     
HIVE_QUERY_REEXECUTION_STATS_PERSISTENCE("hive.query.reexecution.stats.persist.scope",
 "metastore",
         new StringSet("query", "hiveserver", "metastore"),
         "Sets the persistence scope of runtime statistics\n"
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java
index 8653c0db02f..8d2e94e72f7 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerWithEngineBase.java
@@ -62,6 +62,12 @@ public abstract class 
HiveIcebergStorageHandlerWithEngineBase {
 
   protected static final String[] EXECUTION_ENGINES = new String[] {"tez"};
 
+  public static final String RETRY_STRATEGIES =
+      
"overlay,reoptimize,reexecute_lost_am,dagsubmit,recompile_without_cbo,write_conflict";
+
+  public static final String RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT =
+      "overlay,reoptimize,reexecute_lost_am," + 
"dagsubmit,recompile_without_cbo";
+
   protected static final Schema ORDER_SCHEMA = new Schema(
           required(1, "order_id", Types.LongType.get()),
           required(2, "customer_id", Types.LongType.get()),
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
index 3f474cfe79a..47c5e8251d7 100644
--- 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergV2.java
@@ -667,6 +667,8 @@ public class TestHiveIcebergV2 extends 
HiveIcebergStorageHandlerWithEngineBase {
           init(shell, testTables, temp, executionEngine);
           HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
           HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+          HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+              RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT);
           shell.executeStatement(sql);
           shell.closeSession();
         });
@@ -689,6 +691,8 @@ public class TestHiveIcebergV2 extends 
HiveIcebergStorageHandlerWithEngineBase {
             init(shell, testTables, temp, executionEngine);
             HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
             HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+                RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT);
             shell.executeStatement(sql);
             shell.closeSession();
           });
@@ -721,6 +725,8 @@ public class TestHiveIcebergV2 extends 
HiveIcebergStorageHandlerWithEngineBase {
             init(shell, testTables, temp, executionEngine);
             HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
             HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+                RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT);
             shell.executeStatement(sql[i]);
             shell.closeSession();
           });
@@ -753,6 +759,8 @@ public class TestHiveIcebergV2 extends 
HiveIcebergStorageHandlerWithEngineBase {
             init(shell, testTables, temp, executionEngine);
             HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
             HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+                RETRY_STRATEGIES_WITHOUT_WRITE_CONFLICT);
             shell.executeStatement(sql);
             shell.closeSession();
           });
diff --git 
a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
new file mode 100644
index 00000000000..305364085b5
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestOptimisticRetry.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Throwables;
+import org.apache.iceberg.util.Tasks;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static 
org.apache.iceberg.mr.hive.HiveIcebergStorageHandlerTestUtils.init;
+
+public class TestOptimisticRetry extends 
HiveIcebergStorageHandlerWithEngineBase {
+
+  @Test
+  public void testConcurrentOverlappingUpdates() {
+
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized &&
+        testTableType == TestTables.TestTableType.HIVE_CATALOG);
+
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    String sql = "UPDATE customers SET last_name='Changed' WHERE customer_id=3 
or first_name='Joanna'";
+
+    try {
+      Tasks.range(2)
+          .executeWith(Executors.newFixedThreadPool(2))
+          .run(i -> {
+            init(shell, testTables, temp, executionEngine);
+            HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+                RETRY_STRATEGIES);
+            shell.executeStatement(sql);
+            shell.closeSession();
+          });
+    } catch (Throwable ex) {
+      // If retry succeeds then it should not throw an ValidationException.
+      Throwable cause = Throwables.getRootCause(ex);
+      if (cause instanceof ValidationException && 
cause.getMessage().matches("^Found.*conflicting.*files(.*)")) {
+        Assert.fail();
+      }
+    }
+
+    List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE 
last_name='Changed'");
+    Assert.assertEquals(5, res.size());
+
+  }
+
+
+  @Test
+  public void testNonOverlappingConcurrent2Updates() {
+
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized &&
+        testTableType == TestTables.TestTableType.HIVE_CATALOG);
+
+    testTables.createTable(shell, "customers", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2, 2);
+    String[] sql = new String[]{"UPDATE customers SET last_name='Changed' 
WHERE customer_id=3 or first_name='Joanna'",
+        "UPDATE customers SET last_name='Changed2' WHERE customer_id=2 and 
first_name='Jake'"};
+
+    try {
+      Tasks.range(2)
+          .executeWith(Executors.newFixedThreadPool(2))
+          .run(i -> {
+            init(shell, testTables, temp, executionEngine);
+            HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+                RETRY_STRATEGIES);
+            shell.executeStatement(sql[i]);
+            shell.closeSession();
+          });
+    } catch (Throwable ex) {
+      // If retry succeeds then it should not throw an ValidationException.
+      Throwable cause = Throwables.getRootCause(ex);
+      if (cause instanceof ValidationException && 
cause.getMessage().matches("^Found.*conflicting.*files(.*)")) {
+        Assert.fail();
+      }
+    }
+
+    List<Object[]> res = shell.executeStatement("SELECT * FROM customers WHERE 
last_name='Changed'");
+    Assert.assertEquals(5, res.size());
+
+    res = shell.executeStatement("SELECT * FROM customers WHERE 
last_name='Changed2'");
+    Assert.assertEquals(1, res.size());
+  }
+
+  @Test
+  public void testConcurrent2MergeInserts() {
+    Assume.assumeTrue(fileFormat == FileFormat.PARQUET && isVectorized &&
+        testTableType == TestTables.TestTableType.HIVE_CATALOG);
+
+    testTables.createTable(shell, "source", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_1);
+    testTables.createTable(shell, "target", 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
+        PartitionSpec.unpartitioned(), fileFormat, 
HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS, 2);
+
+    String sql = "MERGE INTO target t USING source s on t.customer_id = 
s.customer_id WHEN Not MATCHED THEN " +
+        "INSERT values (s.customer_id, s.first_name, s.last_name)";
+    try {
+      Tasks.range(2)
+          .executeWith(Executors.newFixedThreadPool(2))
+          .run(i -> {
+            init(shell, testTables, temp, executionEngine);
+            HiveConf.setBoolVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, isVectorized);
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+            HiveConf.setVar(shell.getHiveConf(), 
HiveConf.ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES,
+                RETRY_STRATEGIES);
+            shell.executeStatement(sql);
+            shell.closeSession();
+          });
+    } catch (Throwable ex) {
+      // If retry succeeds then it should not throw an ValidationException.
+      Throwable cause = Throwables.getRootCause(ex);
+      if (cause instanceof ValidationException && 
cause.getMessage().matches("^Found.*conflicting.*files(.*)")) {
+        Assert.fail();
+      }
+    }
+    List<Object[]> res = shell.executeStatement("SELECT * FROM target");
+    Assert.assertEquals(6, res.size());
+  }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
index 3cedff97f66..25357da6566 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/DriverFactory.java
@@ -25,11 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.reexec.IReExecutionPlugin;
 import org.apache.hadoop.hive.ql.reexec.ReExecDriver;
-import org.apache.hadoop.hive.ql.reexec.ReExecuteLostAMQueryPlugin;
-import org.apache.hadoop.hive.ql.reexec.ReCompileWithoutCBOPlugin;
-import org.apache.hadoop.hive.ql.reexec.ReExecutionOverlayPlugin;
-import org.apache.hadoop.hive.ql.reexec.ReExecutionDagSubmitPlugin;
-import org.apache.hadoop.hive.ql.reexec.ReOptimizePlugin;
+import org.apache.hadoop.hive.ql.reexec.ReExecutionStrategyType;
 
 import com.google.common.base.Strings;
 
@@ -66,23 +62,13 @@ public final class DriverFactory {
   }
 
   private static IReExecutionPlugin buildReExecPlugin(String name) throws 
RuntimeException {
-    if ("overlay".equals(name)) {
-      return new ReExecutionOverlayPlugin();
+    Class<? extends IReExecutionPlugin> pluginType = 
ReExecutionStrategyType.getPluginClassByName(name);
+    try {
+      return pluginType.newInstance();
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new RuntimeException(
+          "Unknown re-execution plugin: " + name + " (" + 
ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES.varname + ")");
     }
-    if ("reoptimize".equals(name)) {
-      return new ReOptimizePlugin();
-    }
-    if ("reexecute_lost_am".equals(name)) {
-      return new ReExecuteLostAMQueryPlugin();
-    }
-    if ("recompile_without_cbo".equals(name)) {
-      return new ReCompileWithoutCBOPlugin();
-    }
-    if (name.equals("dagsubmit")) {
-      return new ReExecutionDagSubmitPlugin();
-    }
-    throw new RuntimeException(
-        "Unknown re-execution plugin: " + name + " (" + 
ConfVars.HIVE_QUERY_REEXECUTION_STRATEGIES.varname + ")");
   }
 
   public static QueryState getNewQueryState(HiveConf conf) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
new file mode 100644
index 00000000000..78c24eee915
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecuteOnWriteConflictPlugin.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import com.google.common.base.Throwables;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
+import org.apache.hadoop.hive.ql.hooks.HookContext;
+import org.apache.hadoop.hive.ql.plan.mapper.PlanMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.regex.Pattern;
+
+public class ReExecuteOnWriteConflictPlugin implements IReExecutionPlugin {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ReExecuteOnWriteConflictPlugin.class);
+  private static boolean retryPossible;
+
+  private static final Pattern writeConflictErrorPattern = 
Pattern.compile("^Found.*conflicting.*files(.*)");
+  private static final String validationException = 
"org.apache.iceberg.exceptions.ValidationException";
+
+  private static final class LocalHook implements ExecuteWithHookContext {
+    @Override
+    public void run(HookContext hookContext) throws Exception {
+      if (hookContext.getHookType() == HookContext.HookType.ON_FAILURE_HOOK) {
+        Throwable exception = hookContext.getException();
+        
+        if (exception != null && exception.getMessage() != null) {
+          Throwable cause = Throwables.getRootCause(exception);
+
+          if (cause.getClass().getName().equals(validationException) &&
+              cause.getMessage().matches(writeConflictErrorPattern.pattern())) 
{
+            retryPossible = true;
+            LOG.info("Retrying query due to write conflict.");
+          }
+          LOG.info("Got exception message: {} retryPossible: {}", 
exception.getMessage(), retryPossible);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void initialize(Driver driver) {
+    driver.getHookRunner().addOnFailureHook(new 
ReExecuteOnWriteConflictPlugin.LocalHook());
+  }
+
+  @Override
+  public boolean shouldReExecute(int executionNum) {
+    return retryPossible;
+  }
+
+  @Override
+  public boolean shouldReExecuteAfterCompile(int executionNum, PlanMapper 
oldPlanMapper, PlanMapper newPlanMapper) {
+    return retryPossible;
+  }
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionStrategyType.java 
b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionStrategyType.java
new file mode 100644
index 00000000000..9f50acc71ed
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/reexec/ReExecutionStrategyType.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.reexec;
+
+import java.util.Optional;
+
+public enum ReExecutionStrategyType {
+  OVERLAY(ReExecutionOverlayPlugin.class),
+  REOPTIMIZE(ReOptimizePlugin.class),
+  REEXECUTE_LOST_AM(ReExecuteLostAMQueryPlugin.class),
+  RECOMPILE_WITHOUT_CBO(ReCompileWithoutCBOPlugin.class),
+  DAGSUBMIT(ReExecutionDagSubmitPlugin.class),
+  WRITE_CONFLICT(ReExecuteOnWriteConflictPlugin.class);
+
+  ReExecutionStrategyType(Class<? extends IReExecutionPlugin> pluginClass) {
+    this.pluginClass = pluginClass;
+  }
+
+  private final Class<? extends IReExecutionPlugin> pluginClass;
+
+  public Class<? extends IReExecutionPlugin> getPluginClass() {
+    return pluginClass;
+  }
+
+  public static Class<? extends IReExecutionPlugin> 
getPluginClassByName(String strategy) {
+    return 
ReExecutionStrategyType.valueOf(strategy.toUpperCase()).getPluginClass();
+  }
+}

Reply via email to