This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push: new 0ab0371 [NO ISSUE] Minor refactoring, fixes, utility functions 0ab0371 is described below commit 0ab037163e46c868debb283e10a6ccba7db47567 Author: Michael Blow <mb...@apache.org> AuthorDate: Thu Nov 5 15:18:01 2020 -0500 [NO ISSUE] Minor refactoring, fixes, utility functions Change-Id: I1843705a5a934bb4814ea7e3239d970f47c298f3 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8763 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Till Westmann <ti...@apache.org> --- .../org/apache/asterix/active/ActivityState.java | 2 +- .../app/active/ActiveEntityEventsListener.java | 15 +++++++------ .../app/active/ActiveNotificationHandler.java | 3 ++- .../asterix/app/active/FeedEventsListener.java | 3 ++- .../apache/asterix/test/active/TestUserActor.java | 9 ++++---- .../metadata/api/IActiveEntityController.java | 12 ++++++----- .../hyracks/api/exceptions/HyracksException.java | 6 ------ .../api/exceptions/IFormattedException.java | 25 ++++++++++++++++++++++ .../org/apache/hyracks/api/util/InvokeUtil.java | 3 ++- .../common/comm/io/FrameTupleAccessor.java | 2 +- .../org/apache/hyracks/util/ThrowingConsumer.java | 6 ++++++ 11 files changed, 59 insertions(+), 27 deletions(-) diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java index 1f3daac..26e7fd5 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActivityState.java @@ -52,7 +52,7 @@ public enum ActivityState { */ SUSPENDING, /** - * The activitiy has been suspended successfully. Next state must be resuming + * The activity has been suspended successfully. Next state must be resuming */ SUSPENDED, /** diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 276a04b..0f5dec7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -23,6 +23,7 @@ import static org.apache.asterix.common.exceptions.ErrorCode.ACTIVE_ENTITY_NOT_R import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -85,7 +86,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl protected final MetadataProvider metadataProvider; protected final IHyracksClientConnection hcc; protected final EntityId entityId; - private final List<Dataset> datasets; + private final Set<Dataset> datasets; protected final ActiveEvent statsUpdatedEvent; protected final String runtimeName; protected final IRetryPolicyFactory retryPolicyFactory; @@ -118,7 +119,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl this.metadataProvider = MetadataProvider.create(appCtx, null); this.hcc = hcc; this.entityId = entityId; - this.datasets = datasets; + this.datasets = new HashSet<>(datasets); this.retryPolicyFactory = retryPolicyFactory; this.state = ActivityState.STOPPED; this.statsTimestamp = -1; @@ -251,19 +252,19 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } @Override - public synchronized void remove(Dataset dataset) throws HyracksDataException { + public synchronized boolean remove(Dataset dataset) throws HyracksDataException { if (isActive()) { throw new RuntimeDataException(ErrorCode.CANNOT_REMOVE_DATASET_FROM_ACTIVE_ENTITY, entityId, state); } - getDatasets().remove(dataset); + return getDatasets().remove(dataset); } @Override - public synchronized void add(Dataset dataset) throws HyracksDataException { + public synchronized boolean add(Dataset dataset) throws HyracksDataException { if (isActive()) { throw new RuntimeDataException(ErrorCode.CANNOT_ADD_DATASET_TO_ACTIVE_ENTITY, entityId, state); } - getDatasets().add(dataset); + return getDatasets().add(dataset); } public JobId getJobId() { @@ -676,7 +677,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } @Override - public List<Dataset> getDatasets() { + public Set<Dataset> getDatasets() { return datasets; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 0a7cad6..8b74e07 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.asterix.active.ActiveEvent; import org.apache.asterix.active.ActiveEvent.Kind; @@ -287,7 +288,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active } LOGGER.log(level, "Acquiring locks"); lockManager.acquireActiveEntityWriteLock(metadataProvider.getLocks(), dataverseName, entityName); - List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); + Set<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); for (Dataset dataset : datasets) { if (targetDataset != null && targetDataset.equals(dataset)) { // DDL operation already acquired the proper lock for the operation diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index fe4cdc5..a18cd45 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -65,10 +65,11 @@ public class FeedEventsListener extends ActiveEntityEventsListener { } @Override - public synchronized void remove(Dataset dataset) throws HyracksDataException { + public synchronized boolean remove(Dataset dataset) throws HyracksDataException { super.remove(dataset); feedConnections.removeIf(o -> o.getDataverseName().equals(dataset.getDataverseName()) && o.getDatasetName().equals(dataset.getDatasetName())); + return false; } public synchronized void addFeedConnection(FeedConnection feedConnection) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java index 1e2a795..bc2c110 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.test.active; +import java.util.Collection; import java.util.List; import java.util.concurrent.Semaphore; @@ -54,7 +55,7 @@ public class TestUserActor extends Actor { String entityName = actionListener.getEntityId().getEntityName(); try { lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName); - List<Dataset> datasets = actionListener.getDatasets(); + Collection<Dataset> datasets = actionListener.getDatasets(); for (Dataset dataset : datasets) { lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(), dataset.getDatasetName()); @@ -77,7 +78,7 @@ public class TestUserActor extends Actor { String entityName = actionListener.getEntityId().getEntityName(); try { lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName); - List<Dataset> datasets = actionListener.getDatasets(); + Collection<Dataset> datasets = actionListener.getDatasets(); for (Dataset dataset : datasets) { lockUtil.modifyDatasetBegin(lockManager, mdProvider.getLocks(), dataset.getDataverseName(), dataset.getDatasetName()); @@ -98,7 +99,7 @@ public class TestUserActor extends Actor { protected void doExecute(MetadataProvider mdProvider) throws Exception { DataverseName dataverseName = actionListener.getEntityId().getDataverseName(); String entityName = actionListener.getEntityId().getEntityName(); - List<Dataset> datasets = actionListener.getDatasets(); + Collection<Dataset> datasets = actionListener.getDatasets(); try { lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName); for (Dataset dataset : datasets) { @@ -125,7 +126,7 @@ public class TestUserActor extends Actor { String entityName = actionListener.getEntityId().getEntityName(); try { lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName, entityName); - List<Dataset> datasets = actionListener.getDatasets(); + Collection<Dataset> datasets = actionListener.getDatasets(); for (Dataset dataset : datasets) { lockManager.upgradeDatasetLockToWrite(mdProvider.getLocks(), dataset.getDataverseName(), dataset.getDatasetName()); diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java index baface2..c355b43 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IActiveEntityController.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.metadata.api; -import java.util.List; +import java.util.Set; import org.apache.asterix.active.IActiveEntityEventsListener; import org.apache.asterix.metadata.declared.MetadataProvider; @@ -76,30 +76,32 @@ public interface IActiveEntityController extends IActiveEntityEventsListener { * * @param dataset * the dataset to add + * @return <code>true</code> if the active entity did not already contain the dataset * @throws HyracksDataException * if the entity is active */ - void add(Dataset dataset) throws HyracksDataException; + boolean add(Dataset dataset) throws HyracksDataException; /** * Remove dataset from the list of associated datasets * * @param dataset * the dataset to add + * @return <code>true</code> if the active entity contained the dataset * @throws HyracksDataException * if the entity is active */ - void remove(Dataset dataset) throws HyracksDataException; + boolean remove(Dataset dataset) throws HyracksDataException; /** * @return the list of associated datasets */ - List<Dataset> getDatasets(); + Set<Dataset> getDatasets(); /** * replace the dataset object with the passed updated object * - * @param target + * @param dataset */ void replace(Dataset dataset); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java index 46a9b66..7b9f632 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java @@ -20,7 +20,6 @@ package org.apache.hyracks.api.exceptions; import java.io.IOException; import java.io.Serializable; -import java.util.Objects; import org.apache.hyracks.api.util.ErrorMessageUtil; @@ -154,11 +153,6 @@ public class HyracksException extends IOException implements IFormattedException return msgCache; } - public boolean matches(String component, int errorCode) { - Objects.requireNonNull(component, "component"); - return component.equals(this.component) && errorCode == this.errorCode; - } - @Override public String toString() { return getLocalizedMessage(); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java index 0f873e1..f8b017b 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/IFormattedException.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.api.exceptions; +import java.util.Objects; + public interface IFormattedException { /** @@ -40,4 +42,27 @@ public interface IFormattedException { * @return the exception message */ String getMessage(); + + /** + * Tests for matching component & errorCode against this exception + * + * @param component the component to match + * @param errorCode the errorCode to match + * @return <code>true</code> if this {@link IFormattedException} instance matches the supplied parameters + */ + default boolean matches(String component, int errorCode) { + Objects.requireNonNull(component, "component"); + return component.equals(getComponent()) && errorCode == getErrorCode(); + } + + /** + * Tests for matching component & errorCode against supplied throwable + * + * @param component the component to match + * @param errorCode the errorCode to match + * @return <code>true</code> if the supplied {@link Throwable} matches the supplied parameters + */ + static boolean matches(Throwable th, String component, int errorCode) { + return th instanceof IFormattedException && ((IFormattedException) th).matches(component, errorCode); + } } \ No newline at end of file diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index 5d52ed9..6ecb677 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -270,7 +270,7 @@ public class InvokeUtil { IDelay delay, IFailedAttemptCallback onFailure) throws HyracksDataException { Throwable failure; int attempt = 0; - while (true) { + while (!Thread.currentThread().isInterrupted()) { attempt++; try { return action.compute(); @@ -291,6 +291,7 @@ public class InvokeUtil { } } } + throw HyracksDataException.create(new InterruptedException()); } @FunctionalInterface diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java index cefada7..1cfc14c 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/comm/io/FrameTupleAccessor.java @@ -70,7 +70,7 @@ public class FrameTupleAccessor implements IFrameTupleAccessor { @Override public int getTupleStartOffset(int tupleIndex) { int offset = tupleIndex == 0 ? FrameConstants.TUPLE_START_OFFSET - : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - 4 * tupleIndex); + : IntSerDeUtils.getInt(buffer.array(), tupleCountOffset - FrameConstants.SIZE_LEN * tupleIndex); return start + offset; } diff --git a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java index be9874a..4f73680 100644 --- a/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java +++ b/hyracks-fullstack/hyracks/hyracks-util/src/main/java/org/apache/hyracks/util/ThrowingConsumer.java @@ -40,4 +40,10 @@ public interface ThrowingConsumer<V> { }; } + static <R> ThrowingFunction<R, Void> asFunction(ThrowingConsumer<R> consumer) { + return input -> { + consumer.process(input); + return null; + }; + } }