DRILL-5287: Provide option to skip updates of ephemeral state changes in Zookeeper
close #758 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/7ebb985e Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/7ebb985e Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/7ebb985e Branch: refs/heads/master Commit: 7ebb985edc823692673a42276b4e2a80fd1f256c Parents: 2b5a6f0 Author: Padma Penumarthy <ppenuma...@yahoo.com> Authored: Tue Feb 21 13:20:57 2017 -0800 Committer: Jinfeng Ni <j...@apache.org> Committed: Thu Mar 2 10:50:24 2017 -0800 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 4 ++++ .../server/options/SystemOptionManager.java | 3 ++- .../drill/exec/work/foreman/QueryManager.java | 20 +++++++++++++++----- 3 files changed, 21 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/7ebb985e/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 4f0f4d9..da3a312 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -441,4 +441,8 @@ public interface ExecConstants { String USE_DYNAMIC_UDFS_KEY = "exec.udf.use_dynamic"; BooleanValidator USE_DYNAMIC_UDFS = new BooleanValidator(USE_DYNAMIC_UDFS_KEY, true); + + String QUERY_TRANSIENT_STATE_UPDATE_KEY = "exec.query.progress.update"; + BooleanValidator QUERY_TRANSIENT_STATE_UPDATE = new BooleanValidator(QUERY_TRANSIENT_STATE_UPDATE_KEY, true); + } http://git-wip-us.apache.org/repos/asf/drill/blob/7ebb985e/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 4a846c0..fa73e06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -166,7 +166,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ExecConstants.EXTERNAL_SORT_DISABLE_MANAGED_OPTION, ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR, ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR, - ExecConstants.USE_DYNAMIC_UDFS + ExecConstants.USE_DYNAMIC_UDFS, + ExecConstants.QUERY_TRANSIENT_STATE_UPDATE }; final Map<String, OptionValidator> tmp = new HashMap<>(); for (final OptionValidator validator : validators) { http://git-wip-us.apache.org/repos/asf/drill/blob/7ebb985e/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index c3bde6e..7305025 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserRemoteException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.store.TransientStore; import org.apache.drill.exec.coord.store.TransientStoreConfig; @@ -109,6 +110,9 @@ public class QueryManager implements AutoCloseable { // How many fragments have finished their execution. private final AtomicInteger finishedFragments = new AtomicInteger(0); + // Is the query saved in transient store + private boolean inTransientStore; + public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider, final ClusterCoordinator coordinator, final Foreman foreman) { this.queryId = queryId; @@ -282,13 +286,21 @@ public class QueryManager implements AutoCloseable { } } - QueryState updateEphemeralState(final QueryState queryState) { - switch (queryState) { + void updateEphemeralState(final QueryState queryState) { + // If query is already in zk transient store, ignore the transient state update option. + // Else, they will not be removed from transient store upon completion. + if (!inTransientStore && + !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) { + return; + } + + switch (queryState) { case ENQUEUED: case STARTING: case RUNNING: case CANCELLATION_REQUESTED: transientProfiles.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. + inTransientStore = true; break; case COMPLETED: @@ -296,17 +308,15 @@ public class QueryManager implements AutoCloseable { case FAILED: try { transientProfiles.remove(stringQueryId); + inTransientStore = false; } catch(final Exception e) { logger.warn("Failure while trying to delete the estore profile for this query.", e); } - break; default: throw new IllegalStateException("unrecognized queryState " + queryState); } - - return queryState; } void writeFinalProfile(UserException ex) {