[ASTERIXDB-2008][CLUS] Only add pending removal if node known [ASTERIXDB-2023][ING] Introduce Enums instead of using bytes
- user model changes: no - storage format changes: no - interface changes: no details: - Only nodes which are known to cluster manager are added to the list of nodes pending removal. Other nodes are ignored - Enums introduced: - ActiveEvent.Kind - ActivePartitionMessage.Event - Remove AdapterRuntimeManager - Remove AdapterExecutor Change-Id: I7044896559798426c04a3f46861bc5335b25d140 Reviewed-on: https://asterix-gerrit.ics.uci.edu/1921 Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/07075667 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/07075667 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/07075667 Branch: refs/heads/master Commit: 07075667c39d1b44939dfb83389de2626b21d8b7 Parents: 3f66d09 Author: Abdullah Alamoudi <bamou...@gmail.com> Authored: Mon Aug 7 22:28:36 2017 -0700 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Tue Aug 8 04:45:08 2017 -0700 ---------------------------------------------------------------------- asterixdb/asterix-active/pom.xml | 4 - .../apache/asterix/active/ActiveManager.java | 32 ++-- .../ActiveSourceOperatorNodePushable.java | 7 +- .../active/SingleThreadEventProcessor.java | 32 ++-- .../active/message/ActiveManagerMessage.java | 12 +- .../active/message/ActivePartitionMessage.java | 14 +- .../active/message/StatsRequestMessage.java | 4 +- .../AppRuntimeContextProviderForRecovery.java | 5 +- .../app/active/ActiveEntityEventsListener.java | 48 +++--- .../app/active/ActiveNotificationHandler.java | 77 ++++----- .../asterix/app/active/FeedEventsListener.java | 2 +- .../message/ExecuteStatementRequestMessage.java | 1 - .../asterix/app/nc/NCAppRuntimeContext.java | 9 +- .../asterix/app/result/ResultPrinter.java | 2 +- .../apache/asterix/utils/FeedOperations.java | 8 +- .../org/apache/asterix/test/active/Action.java | 10 +- .../asterix/test/active/ActiveStatsTest.java | 5 +- .../org/apache/asterix/test/active/Actor.java | 2 +- .../test/active/TestNodeControllerActor.java | 11 +- .../asterix/test/common/TestExecutor.java | 52 +++--- .../results/api/feed-stats/feed-stats.1.adm | 3 +- .../common/context/DatasetLifecycleManager.java | 5 +- .../asterix/common/exceptions/ErrorCode.java | 1 + .../asterix/common/transactions/Checkpoint.java | 4 +- .../IAppRuntimeContextProvider.java | 5 +- .../asterix/common/utils/StoragePathUtil.java | 2 +- .../main/resources/asx_errormsg/en.properties | 1 + .../adapter/factory/LookupAdapterFactory.java | 2 +- .../external/api/IDataFlowController.java | 12 +- .../external/api/IDataSourceAdapter.java | 4 +- .../AbstractFeedDataFlowController.java | 9 -- .../dataflow/FeedRecordDataFlowController.java | 160 ++++++++++++------- .../dataflow/FeedStreamDataFlowController.java | 22 ++- .../external/dataflow/FeedTupleForwarder.java | 2 +- .../dataflow/IndexingDataFlowController.java | 6 +- .../dataflow/RateControlledTupleForwarder.java | 3 +- .../external/dataset/adapter/FeedAdapter.java | 9 +- .../dataset/adapter/GenericAdapter.java | 1 - .../external/feed/runtime/AdapterExecutor.java | 98 ------------ .../feed/runtime/AdapterRuntimeManager.java | 144 ----------------- .../external/input/HDFSDataSourceFactory.java | 2 +- .../input/stream/LocalFSInputStream.java | 18 ++- .../factory/SocketClientInputStreamFactory.java | 2 +- .../factory/SocketServerInputStreamFactory.java | 2 +- .../factory/TwitterFirehoseStreamFactory.java | 2 +- .../external/library/java/JObjectAccessors.java | 23 +-- ...rnalIndexBulkModifyOperatorNodePushable.java | 2 +- .../ExternalLookupOperatorDescriptor.java | 7 +- .../operators/FeedIntakeOperatorDescriptor.java | 5 +- .../FeedIntakeOperatorNodePushable.java | 98 +++++++----- .../classad/BuiltinClassAdFunctions.java | 4 +- .../library/adapter/TestTypedAdapter.java | 7 - .../adapter/TestTypedAdapterFactory.java | 4 +- .../asterix/builders/AbstractListBuilder.java | 6 +- .../records/RecordAddFieldsDescriptor.java | 30 ++-- .../std/FlushDatasetOperatorDescriptor.java | 6 +- .../transaction/GlobalResourceIdFactory.java | 4 +- .../runtime/utils/ClusterStateManager.java | 5 +- .../LockThenSearchOperationCallback.java | 6 +- ...maryIndexInstantSearchOperationCallback.java | 6 +- ...imaryIndexModificationOperationCallback.java | 4 +- ...dexModificationOperationCallbackFactory.java | 2 +- ...dexModificationOperationCallbackFactory.java | 2 +- .../locking/TestRuntimeContextProvider.java | 6 +- .../AlgebricksAbsolutePartitionConstraint.java | 8 + .../operators/std/SplitOperatorDescriptor.java | 6 +- .../std/StringStreamingRuntimeFactory.java | 7 +- .../org/apache/hyracks/api/util/IoUtil.java | 37 ++++- .../hyracks/api/test/FrameWriterTestUtils.java | 5 + .../cc/dataset/DatasetDirectoryService.java | 6 + .../control/cc/executor/JobExecutor.java | 51 +++--- .../control/cc/work/TaskFailureWork.java | 4 + .../nc/dataset/DatasetPartitionWriter.java | 2 +- .../apache/hyracks/control/nc/io/IOManager.java | 4 +- .../control/nc/work/ApplicationMessageWork.java | 5 +- .../control/nc/work/NotifyTaskCompleteWork.java | 11 +- .../control/nc/work/NotifyTaskFailureWork.java | 3 +- .../common/comm/io/ArrayTupleBuilder.java | 4 +- .../AbstractReplicateOperatorDescriptor.java | 10 +- .../std/collectors/InputChannelFrameReader.java | 3 +- .../LocalityAwarePartitionDataWriter.java | 4 +- .../MToNBroadcastConnectorDescriptor.java | 6 +- .../file/FrameFileWriterOperatorDescriptor.java | 4 +- .../file/PlainFileWriterOperatorDescriptor.java | 6 +- ...eflectionBasedDeserializedMapperFactory.java | 6 +- .../org/apache/hyracks/hdfs/ContextFactory.java | 5 +- .../org/apache/hyracks/hdfs/ContextFactory.java | 5 +- .../hyracks/hdfs/dataflow/ConfFactory.java | 4 +- .../dataflow/HDFSWriteOperatorDescriptor.java | 3 +- .../hdfs/dataflow/InputSplitsFactory.java | 2 +- .../hyracks/hdfs2/dataflow/ConfFactory.java | 5 +- .../hdfs2/dataflow/FileSplitsFactory.java | 5 +- .../dataflow/HDFSWriteOperatorDescriptor.java | 3 +- .../org/apache/hyracks/ipc/impl/IPCSystem.java | 1 + .../IndexSearchOperatorNodePushable.java | 4 +- .../freepage/MutableArrayValueReference.java | 7 + .../lsm/btree/impls/LSMBTreeDiskComponent.java | 5 + .../impls/LSMBTreeWithBuddyDiskComponent.java | 4 + .../storage/am/lsm/common/impls/LSMHarness.java | 16 +- .../lsm/common/utils/ComponentMetadataUtil.java | 23 ++- .../impls/LSMInvertedIndexDiskComponent.java | 5 + .../ondisk/OnDiskInvertedIndex.java | 4 +- .../lsm/rtree/impls/LSMRTreeDiskComponent.java | 5 + 103 files changed, 665 insertions(+), 716 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml index 3dd24b6..6568795 100644 --- a/asterixdb/asterix-active/pom.xml +++ b/asterixdb/asterix-active/pom.xml @@ -31,10 +31,6 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </dependency> - <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index fcf2be9..c0717b9 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -23,14 +23,16 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActiveStatsResponse; import org.apache.asterix.active.message.StatsRequestMessage; -import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.memory.ConcurrentFramePool; @@ -38,21 +40,20 @@ import org.apache.hyracks.api.application.INCServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.JavaSerializationUtils; import org.apache.hyracks.control.nc.NodeControllerService; -import org.apache.log4j.Logger; public class ActiveManager { private static final Logger LOGGER = Logger.getLogger(ActiveManager.class.getName()); private static final int SHUTDOWN_TIMEOUT_SECS = 60; - private final ThreadExecutor executor; + private final ExecutorService executor; private final ConcurrentMap<ActiveRuntimeId, IActiveRuntime> runtimes; private final ConcurrentFramePool activeFramePool; private final String nodeId; private final INCServiceContext serviceCtx; private volatile boolean shutdown; - public ActiveManager(ThreadExecutor executor, String nodeId, long activeMemoryBudget, int frameSize, + public ActiveManager(ExecutorService executor, String nodeId, long activeMemoryBudget, int frameSize, INCServiceContext serviceCtx) throws HyracksDataException { this.executor = executor; this.nodeId = nodeId; @@ -86,15 +87,16 @@ public class ActiveManager { } public void submit(ActiveManagerMessage message) throws HyracksDataException { + LOGGER.log(Level.INFO, "Message of type " + message.getKind() + " received in " + nodeId); switch (message.getKind()) { - case ActiveManagerMessage.STOP_ACTIVITY: + case STOP_ACTIVITY: stopRuntime(message); break; - case ActiveManagerMessage.REQUEST_STATS: + case REQUEST_STATS: requestStats((StatsRequestMessage) message); break; default: - LOGGER.warn("Unknown message type received: " + message.getKind()); + LOGGER.warning("Unknown message type received: " + message.getKind()); } } @@ -104,7 +106,7 @@ public class ActiveManager { IActiveRuntime runtime = runtimes.get(runtimeId); long reqId = message.getReqId(); if (runtime == null) { - LOGGER.warn("Request stats of a runtime that is not registered " + runtimeId); + LOGGER.warning("Request stats of a runtime that is not registered " + runtimeId); // Send a failure message ((NodeControllerService) serviceCtx.getControllerService()) .sendApplicationMessageToCC( @@ -124,7 +126,7 @@ public class ActiveManager { } public void shutdown() { - LOGGER.warn("Shutting down ActiveManager on node " + nodeId); + LOGGER.warning("Shutting down ActiveManager on node " + nodeId); Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>(); shutdown = true; runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId, executor.submit(() -> { @@ -136,29 +138,29 @@ public class ActiveManager { try { entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.warn("Interrupted waiting to stop runtime: " + entry.getKey()); + LOGGER.warning("Interrupted waiting to stop runtime: " + entry.getKey()); Thread.currentThread().interrupt(); } catch (ExecutionException e) { - LOGGER.warn("Exception while stopping runtime: " + entry.getKey(), e); + LOGGER.log(Level.WARNING, "Exception while stopping runtime: " + entry.getKey(), e); } catch (TimeoutException e) { - LOGGER.warn("Timed out waiting to stop runtime: " + entry.getKey(), e); + LOGGER.log(Level.WARNING, "Timed out waiting to stop runtime: " + entry.getKey(), e); } }); - LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete"); + LOGGER.warning("Shutdown ActiveManager on node " + nodeId + " complete"); } private void stopRuntime(ActiveManagerMessage message) { ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload(); IActiveRuntime runtime = runtimes.get(runtimeId); if (runtime == null) { - LOGGER.warn("Request to stop a runtime that is not registered " + runtimeId); + LOGGER.warning("Request to stop a runtime that is not registered " + runtimeId); } else { executor.execute(() -> { try { stopIfRunning(runtimeId, runtime); } catch (Exception e) { // TODO(till) Figure out a better way to handle failure to stop a runtime - LOGGER.warn("Failed to stop runtime: " + runtimeId, e); + LOGGER.log(Level.WARNING, "Failed to stop runtime: " + runtimeId, e); } }); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index a7d7796..27ecb52 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -22,6 +22,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.common.api.INcApplicationContext; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; @@ -36,6 +37,7 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp protected final IHyracksTaskContext ctx; protected final ActiveManager activeManager; /** A unique identifier for the runtime **/ + protected Thread taskThread; protected final ActiveRuntimeId runtimeId; private volatile boolean done = false; @@ -85,11 +87,12 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp @Override public final void initialize() throws HyracksDataException { LOGGER.log(Level.INFO, "initialize() called on ActiveSourceOperatorNodePushable"); + taskThread = Thread.currentThread(); activeManager.registerRuntime(this); try { // notify cc that runtime has been registered ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null), null); + Event.RUNTIME_REGISTERED, null), null); start(); } catch (InterruptedException e) { LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e); @@ -112,7 +115,7 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp activeManager.deregisterRuntime(runtimeId); try { ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null), null); + Event.RUNTIME_DEREGISTERED, null), null); } catch (Exception e) { LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e); throw HyracksDataException.create(e); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java index 0a36216..de6682d 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java @@ -18,15 +18,12 @@ */ package org.apache.asterix.active; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.hyracks.api.exceptions.HyracksDataException; public abstract class SingleThreadEventProcessor<T> implements Runnable { @@ -34,20 +31,20 @@ public abstract class SingleThreadEventProcessor<T> implements Runnable { private static final Logger LOGGER = Logger.getLogger(SingleThreadEventProcessor.class.getName()); private final String name; private final LinkedBlockingQueue<T> eventInbox; - private final ExecutorService executorService; - private final Future<?> future; + private volatile Thread executorThread; + private volatile boolean stopped = false; public SingleThreadEventProcessor(String threadName) { this.name = threadName; eventInbox = new LinkedBlockingQueue<>(); - executorService = Executors.newSingleThreadExecutor(r -> new Thread(r, threadName)); - future = executorService.submit(this); + executorThread = new Thread(this, threadName); + executorThread.start(); } @Override public final void run() { LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName()); - while (!Thread.currentThread().isInterrupted()) { + while (!stopped) { try { T event = eventInbox.take(); handle(event); @@ -69,10 +66,19 @@ public abstract class SingleThreadEventProcessor<T> implements Runnable { } public void stop() throws HyracksDataException, InterruptedException { - future.cancel(true); - executorService.shutdown(); - if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) { - throw HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name); + stopped = true; + executorThread.interrupt(); + executorThread.join(1000); + int attempt = 0; + while (executorThread.isAlive()) { + attempt++; + LOGGER.log(Level.WARNING, + "Failed to stop event processor after " + attempt + " attempts. Interrupted exception swallowed?"); + if (attempt == 10) { + throw new RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name); + } + executorThread.interrupt(); + executorThread.join(1000); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index 9772698..bef418b 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -26,14 +26,16 @@ import org.apache.asterix.common.messaging.api.INcAddressedMessage; import org.apache.hyracks.api.exceptions.HyracksDataException; public class ActiveManagerMessage implements INcAddressedMessage { - public static final byte STOP_ACTIVITY = 0x00; - public static final byte REQUEST_STATS = 0x01; + public enum Kind { + STOP_ACTIVITY, + REQUEST_STATS + } private static final long serialVersionUID = 1L; - private final byte kind; + private final Kind kind; private final Serializable payload; - public ActiveManagerMessage(byte kind, Serializable payload) { + public ActiveManagerMessage(Kind kind, Serializable payload) { this.kind = kind; this.payload = payload; } @@ -42,7 +44,7 @@ public class ActiveManagerMessage implements INcAddressedMessage { return payload; } - public byte getKind() { + public Kind getKind() { return kind; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index a47d5a5..9ace417 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -29,17 +29,19 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; public class ActivePartitionMessage implements ICcAddressedMessage { + public enum Event { + RUNTIME_REGISTERED, + RUNTIME_DEREGISTERED, + GENERIC_EVENT + } private static final long serialVersionUID = 1L; - public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00; - public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01; - public static final byte GENERIC_EVENT = 0x02; private final ActiveRuntimeId activeRuntimeId; private final JobId jobId; private final Serializable payload; - private final byte event; + private final Event event; - public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, byte event, Serializable payload) { + public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload) { this.activeRuntimeId = activeRuntimeId; this.jobId = jobId; this.event = event; @@ -58,7 +60,7 @@ public class ActivePartitionMessage implements ICcAddressedMessage { return payload; } - public byte getEvent() { + public Event getEvent() { return event; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java index 8fa5f19..d43f00e 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java @@ -24,8 +24,8 @@ public class StatsRequestMessage extends ActiveManagerMessage { private static final long serialVersionUID = 1L; private final long reqId; - public StatsRequestMessage(byte kind, Serializable payload, long reqId) { - super(kind, payload); + public StatsRequestMessage(Serializable payload, long reqId) { + super(Kind.REQUEST_STATS, payload); this.reqId = reqId; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java index 1fea840..18ef143 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java @@ -18,10 +18,11 @@ */ package org.apache.asterix.api.common; +import java.util.concurrent.ExecutorService; + import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.hyracks.api.io.IIOManager; @@ -84,7 +85,7 @@ public class AppRuntimeContextProviderForRecovery implements IAppRuntimeContextP } @Override - public ThreadExecutor getThreadExecutor() { + public ExecutorService getThreadExecutor() { return asterixAppRuntimeContext.getThreadExecutor(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 995e372..acb1614 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -38,8 +38,8 @@ import org.apache.asterix.active.IActiveEntityEventSubscriber; import org.apache.asterix.active.IRetryPolicy; import org.apache.asterix.active.IRetryPolicyFactory; import org.apache.asterix.active.NoRetryPolicyFactory; -import org.apache.asterix.active.message.ActiveManagerMessage; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.active.message.StatsRequestMessage; import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; import org.apache.asterix.common.api.IMetadataLockManager; @@ -68,6 +68,7 @@ import org.apache.hyracks.api.job.JobStatus; public abstract class ActiveEntityEventsListener implements IActiveEntityController { private static final Logger LOGGER = Logger.getLogger(ActiveEntityEventsListener.class.getName()); + private static final Level level = Level.INFO; private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null, Kind.STATE_CHANGED, null, null); private static final EnumSet<ActivityState> TRANSITION_STATES = EnumSet.of(ActivityState.RESUMING, ActivityState.STARTING, ActivityState.STOPPING, ActivityState.RECOVERING); @@ -130,7 +131,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } protected synchronized void setState(ActivityState newState) { - LOGGER.log(Level.FINE, "State is being set to " + newState + " from " + state); + LOGGER.log(level, "State of " + getEntityId() + "is being set to " + newState + " from " + state); this.prevState = state; this.state = newState; if (newState == ActivityState.SUSPENDED) { @@ -142,7 +143,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void notify(ActiveEvent event) { try { - LOGGER.fine("EventListener is notified."); + LOGGER.log(level, "EventListener is notified."); ActiveEvent.Kind eventKind = event.getEventKind(); switch (eventKind) { case JOB_CREATED: @@ -172,22 +173,24 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } protected synchronized void handle(ActivePartitionMessage message) { - if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) { + if (message.getEvent() == Event.RUNTIME_REGISTERED) { numRegistered++; if (numRegistered == locations.getLocations().length) { setState(ActivityState.RUNNING); } - } else if (message.getEvent() == ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) { + } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) { numRegistered--; } } @SuppressWarnings("unchecked") protected void finish(ActiveEvent event) throws HyracksDataException { + LOGGER.log(level, "the job " + jobId + " finished"); jobId = null; Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); JobStatus jobStatus = status.getLeft(); List<Exception> exceptions = status.getRight(); + LOGGER.log(level, "The job finished with status: " + jobStatus); if (jobStatus.equals(JobStatus.FAILURE)) { jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); @@ -271,10 +274,10 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @SuppressWarnings("unchecked") @Override public void refreshStats(long timeout) throws HyracksDataException { - LOGGER.log(Level.FINE, "refreshStats called"); + LOGGER.log(level, "refreshStats called"); synchronized (this) { if (state != ActivityState.RUNNING || isFetchingStats) { - LOGGER.log(Level.FINE, + LOGGER.log(level, "returning immediately since state = " + state + " and fetchingStats = " + isFetchingStats); return; } else { @@ -287,8 +290,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl List<INcAddressedMessage> requests = new ArrayList<>(); List<String> ncs = Arrays.asList(locations.getLocations()); for (int i = 0; i < ncs.size(); i++) { - requests.add(new StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS, - new ActiveRuntimeId(entityId, runtimeName, i), reqId)); + requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId, runtimeName, i), reqId)); } try { List<String> responses = (List<String>) messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout); @@ -348,32 +350,32 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void recover() throws HyracksDataException { - LOGGER.log(Level.FINE, "Recover is called on " + entityId); + LOGGER.log(level, "Recover is called on " + entityId); if (recoveryTask != null) { - LOGGER.log(Level.FINE, "But recovery task for " + entityId + " is already there!! throwing an exception"); + LOGGER.log(level, "But recovery task for " + entityId + " is already there!! throwing an exception"); throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS); } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { - LOGGER.log(Level.FINE, "But it has no recovery policy, so it is set to permanent failure"); + LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); setState(ActivityState.PERMANENTLY_FAILED); } else { ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); IRetryPolicy policy = retryPolicyFactory.create(this); cancelRecovery = false; setState(ActivityState.TEMPORARILY_FAILED); - LOGGER.log(Level.FINE, "Recovery task has been submitted"); + LOGGER.log(level, "Recovery task has been submitted"); recoveryTask = executor.submit(() -> doRecover(policy)); } } protected Void doRecover(IRetryPolicy policy) throws AlgebricksException, HyracksDataException, InterruptedException { - LOGGER.log(Level.FINE, "Actual Recovery task has started"); + LOGGER.log(level, "Actual Recovery task has started"); if (getState() != ActivityState.TEMPORARILY_FAILED) { - LOGGER.log(Level.FINE, "but its state is not temp failure and so we're just returning"); + LOGGER.log(level, "but its state is not temp failure and so we're just returning"); return null; } - LOGGER.log(Level.FINE, "calling the policy"); + LOGGER.log(level, "calling the policy"); while (policy.retry()) { synchronized (this) { if (cancelRecovery) { @@ -402,7 +404,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl doStart(metadataProvider); return null; } catch (Exception e) { - LOGGER.log(Level.WARNING, "Attempt to revive " + entityId + " failed", e); + LOGGER.log(level, "Attempt to revive " + entityId + " failed", e); setState(ActivityState.TEMPORARILY_FAILED); recoverFailure = e; } finally { @@ -515,10 +517,10 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl WaitForStateSubscriber subscriber; Future<Void> suspendTask; synchronized (this) { - LOGGER.log(Level.FINE, "suspending entity " + entityId); - LOGGER.log(Level.FINE, "Waiting for ongoing activities"); + LOGGER.log(level, "suspending entity " + entityId); + LOGGER.log(level, "Waiting for ongoing activities"); waitForNonTransitionState(); - LOGGER.log(Level.FINE, "Proceeding with suspension. Current state is " + state); + LOGGER.log(level, "Proceeding with suspension. Current state is " + state); if (state == ActivityState.STOPPED || state == ActivityState.PERMANENTLY_FAILED) { suspended = true; return; @@ -536,12 +538,12 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl EnumSet.of(ActivityState.SUSPENDED, ActivityState.TEMPORARILY_FAILED)); suspendTask = metadataProvider.getApplicationContext().getServiceContext().getControllerService() .getExecutor().submit(() -> doSuspend(metadataProvider)); - LOGGER.log(Level.FINE, "Suspension task has been submitted"); + LOGGER.log(level, "Suspension task has been submitted"); } try { - LOGGER.log(Level.FINE, "Waiting for suspension task to complete"); + LOGGER.log(level, "Waiting for suspension task to complete"); suspendTask.get(); - LOGGER.log(Level.FINE, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); + LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); subscriber.sync(); } catch (Exception e) { synchronized (this) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index c5e5dbb..d36d9b7 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -53,7 +53,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active implements IActiveNotificationHandler, IJobLifecycleListener { private static final Logger LOGGER = Logger.getLogger(ActiveNotificationHandler.class.getName()); - private static final boolean DEBUG = false; + private static final Level level = Level.INFO; public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob"; private final Map<EntityId, IActiveEntityEventsListener> entityEventListeners; private final Map<JobId, EntityId> jobId2EntityId; @@ -73,13 +73,13 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active EntityId entityId = jobId2EntityId.get(event.getJobId()); if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - LOGGER.log(Level.FINE, "Next event is of type " + event.getEventKind()); + LOGGER.log(level, "Next event is of type " + event.getEventKind()); if (event.getEventKind() == Kind.JOB_FINISHED) { - LOGGER.log(Level.FINE, "Removing the job"); + LOGGER.log(level, "Removing the job"); jobId2EntityId.remove(event.getJobId()); } if (listener != null) { - LOGGER.log(Level.FINE, "Notifying the listener"); + LOGGER.log(level, "Notifying the listener"); listener.notify(event); } } else { @@ -91,34 +91,30 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { - LOGGER.log(Level.FINE, + LOGGER.log(level, "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " + jobId); Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); if (property == null || !(property instanceof EntityId)) { - LOGGER.log(Level.FINE, "Job is not of type active job. property found to be: " + property); + LOGGER.log(level, "Job is not of type active job. property found to be: " + property); return; } EntityId entityId = (EntityId) property; monitorJob(jobId, entityId); boolean found = jobId2EntityId.get(jobId) != null; - LOGGER.log(Level.FINE, "Job was found to be: " + (found ? "Active" : "Inactive")); + LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive")); add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); } private synchronized void monitorJob(JobId jobId, EntityId entityId) { - if (DEBUG) { - LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); - boolean found = jobId2EntityId.get(jobId) != null; - LOGGER.log(Level.WARNING, "Job was found to be: " + (found ? "Active" : "Inactive")); - } + LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); + boolean found = jobId2EntityId.get(jobId) != null; + LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive")); if (entityEventListeners.containsKey(entityId)) { if (jobId2EntityId.containsKey(jobId)) { LOGGER.severe("Job is already being monitored for job: " + jobId); return; } - if (DEBUG) { - LOGGER.log(Level.WARNING, "monitoring started for job id: " + jobId); - } + LOGGER.log(level, "monitoring started for job id: " + jobId); } else { LOGGER.info("No listener was found for the entity: " + entityId); } @@ -140,9 +136,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (entityId != null) { add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); } else { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("NO NEED TO NOTIFY JOB FINISH!"); - } + LOGGER.log(level, "NO NEED TO NOTIFY JOB FINISH!"); } } @@ -156,20 +150,16 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public IActiveEntityEventsListener getListener(EntityId entityId) { - if (DEBUG) { - LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - LOGGER.log(Level.WARNING, "Listener found: " + listener); - } + LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); + IActiveEntityEventsListener listener = entityEventListeners.get(entityId); + LOGGER.log(level, "Listener found: " + listener); return entityEventListeners.get(entityId); } @Override public synchronized IActiveEntityEventsListener[] getEventListeners() { - if (DEBUG) { - LOGGER.log(Level.WARNING, "getEventListeners() was called"); - LOGGER.log(Level.WARNING, "returning " + entityEventListeners.size() + " Listeners"); - } + LOGGER.log(level, "getEventListeners() was called"); + LOGGER.log(level, "returning " + entityEventListeners.size() + " Listeners"); return entityEventListeners.values().toArray(new IActiveEntityEventsListener[entityEventListeners.size()]); } @@ -178,11 +168,8 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - if (DEBUG) { - LOGGER.log(Level.WARNING, - "registerListener(IActiveEntityEventsListener listener) was called for the entity " - + listener.getEntityId()); - } + LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity " + + listener.getEntityId()); if (entityEventListeners.containsKey(listener.getEntityId())) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId()); } @@ -194,7 +181,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - LOGGER.log(Level.FINE, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " + listener.getEntityId()); IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); if (registeredListener == null) { @@ -221,16 +208,16 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public synchronized void recover() throws HyracksDataException { - LOGGER.log(Level.FINE, "Starting active recovery"); + LOGGER.log(level, "Starting active recovery"); for (IActiveEntityEventsListener listener : entityEventListeners.values()) { synchronized (listener) { - LOGGER.log(Level.FINE, "Entity " + listener.getEntityId() + " is " + listener.getStats()); + LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getStats()); if (listener.getState() == ActivityState.PERMANENTLY_FAILED && listener instanceof IActiveEntityController) { - LOGGER.log(Level.FINE, "Recovering"); + LOGGER.log(level, "Recovering"); ((IActiveEntityController) listener).recover(); } else { - LOGGER.log(Level.FINE, "Only notifying"); + LOGGER.log(level, "Only notifying"); listener.notifyAll(); } } @@ -243,7 +230,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED); } - LOGGER.log(Level.FINE, "Suspending active events handler"); + LOGGER.log(level, "Suspending active events handler"); suspended = true; } IMetadataLockManager lockManager = mdProvider.getApplicationContext().getMetadataLockManager(); @@ -253,27 +240,27 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active // exclusive lock all the datasets String dataverseName = listener.getEntityId().getDataverse(); String entityName = listener.getEntityId().getEntityName(); - LOGGER.log(Level.FINE, "Suspending " + listener.getEntityId()); - LOGGER.log(Level.FINE, "Acquiring locks"); + LOGGER.log(level, "Suspending " + listener.getEntityId()); + LOGGER.log(level, "Acquiring locks"); lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(), dataverseName + '.' + entityName); List<Dataset> datasets = ((ActiveEntityEventsListener) listener).getDatasets(); for (Dataset dataset : datasets) { lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(), DatasetUtil.getFullyQualifiedName(dataset)); } - LOGGER.log(Level.FINE, "locks acquired"); + LOGGER.log(level, "locks acquired"); ((ActiveEntityEventsListener) listener).suspend(mdProvider); - LOGGER.log(Level.FINE, listener.getEntityId() + " suspended"); + LOGGER.log(level, listener.getEntityId() + " suspended"); } } public void resume(MetadataProvider mdProvider) throws AsterixException, HyracksDataException, InterruptedException { - LOGGER.log(Level.FINE, "Resuming active events handler"); + LOGGER.log(level, "Resuming active events handler"); for (IActiveEntityEventsListener listener : entityEventListeners.values()) { - LOGGER.log(Level.FINE, "Resuming " + listener.getEntityId()); + LOGGER.log(level, "Resuming " + listener.getEntityId()); ((ActiveEntityEventsListener) listener).resume(mdProvider); - LOGGER.log(Level.FINE, listener.getEntityId() + " resumed"); + LOGGER.log(level, listener.getEntityId() + " resumed"); } synchronized (this) { suspended = false; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java index 45c79a0..124e56e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/FeedEventsListener.java @@ -114,7 +114,7 @@ public class FeedEventsListener extends ActiveEntityEventsListener { @Override protected Void doStop(MetadataProvider metadataProvider) throws HyracksDataException, AlgebricksException { IActiveEntityEventSubscriber eventSubscriber = - new WaitForStateSubscriber(this, Collections.singleton(ActivityState.STOPPED)); + new WaitForStateSubscriber(this, EnumSet.of(ActivityState.STOPPED, ActivityState.PERMANENTLY_FAILED)); try { // Construct ActiveMessage for (int i = 0; i < getLocations().getLocations().length; i++) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index 9faa9e9..e7919fa 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -137,7 +137,6 @@ public final class ExecuteStatementRequestMessage implements ICcAddressedMessage GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, "Unexpected exception", e); responseMsg.setError(new Exception(e.toString())); } - try { messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 29bc95e..7647881 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -24,6 +24,8 @@ import java.rmi.server.UnicastRemoteObject; import java.util.ArrayList; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -31,7 +33,6 @@ import org.apache.asterix.active.ActiveManager; import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.api.ThreadExecutor; import org.apache.asterix.common.cluster.ClusterPartition; import org.apache.asterix.common.config.ActiveProperties; import org.apache.asterix.common.config.AsterixExtension; @@ -113,7 +114,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { private ReplicationProperties replicationProperties; private MessagingProperties messagingProperties; private final NodeProperties nodeProperties; - private ThreadExecutor threadExecutor; + private ExecutorService threadExecutor; private IDatasetLifecycleManager datasetLifecycleManager; private IBufferCache bufferCache; private ITransactionSubsystem txnSubsystem; @@ -164,7 +165,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { @Override public void initialize(boolean initialRun) throws IOException, ACIDException { ioManager = getServiceContext().getIoManager(); - threadExecutor = new ThreadExecutor(getServiceContext().getThreadFactory()); + threadExecutor = Executors.newCachedThreadPool(getServiceContext().getThreadFactory()); ICacheMemoryAllocator allocator = new HeapBufferAllocator(); IPageCleanerPolicy pcp = new DelayPageCleanerPolicy(600000); IPageReplacementStrategy prs = new ClockPageReplacementStrategy(allocator, @@ -383,7 +384,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { } @Override - public ThreadExecutor getThreadExecutor() { + public ExecutorService getThreadExecutor() { return threadExecutor; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java index 452d13e..56975d1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java @@ -107,7 +107,7 @@ public class ResultPrinter { } app.append("\r\n"); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java index 09c4983..9fc9940 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java @@ -31,6 +31,7 @@ import java.util.TreeSet; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.message.ActiveManagerMessage; +import org.apache.asterix.active.message.ActiveManagerMessage.Kind; import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; @@ -272,8 +273,9 @@ public class FeedOperations { } // make connections between operators - for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor, Integer>, - Pair<IOperatorDescriptor, Integer>>> entry : subJob.getConnectorOperatorMap().entrySet()) { + for (Entry<ConnectorDescriptorId, + Pair<Pair<IOperatorDescriptor, Integer>, Pair<IOperatorDescriptor, Integer>>> entry + : subJob.getConnectorOperatorMap().entrySet()) { ConnectorDescriptorId newId = connectorIdMapping.get(entry.getKey()); IConnectorDescriptor connDesc = jobSpec.getConnectorMap().get(newId); Pair<IOperatorDescriptor, Integer> leftOp = entry.getValue().getLeft(); @@ -381,7 +383,7 @@ public class FeedOperations { public static void SendStopMessageToNode(ICcApplicationContext appCtx, EntityId feedId, String intakeNodeLocation, Integer partition) throws Exception { - ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY, + ActiveManagerMessage stopFeedMessage = new ActiveManagerMessage(Kind.STOP_ACTIVITY, new ActiveRuntimeId(feedId, FeedIntakeOperatorNodePushable.class.getSimpleName(), partition)); SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java index 71cb038..74c4364 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java @@ -21,7 +21,7 @@ package org.apache.asterix.test.active; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; -abstract class Action { +public abstract class Action { boolean done = false; HyracksDataException failure; @@ -39,21 +39,21 @@ abstract class Action { protected abstract void doExecute(MetadataProvider mdProvider) throws Exception; - boolean hasFailed() { + public boolean hasFailed() { return failure != null; } - HyracksDataException getFailure() { + public HyracksDataException getFailure() { return failure; } - synchronized void sync() throws InterruptedException { + public synchronized void sync() throws InterruptedException { while (!done) { wait(); } } - boolean isDone() { + public boolean isDone() { return done; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index f8baa0e..e1fdb69 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -30,6 +30,7 @@ import org.apache.asterix.active.EntityId; import org.apache.asterix.active.IActiveRuntime; import org.apache.asterix.active.NoRetryPolicyFactory; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.algebra.base.ILangExtension.Language; import org.apache.asterix.app.active.ActiveEntityEventsListener; import org.apache.asterix.app.active.ActiveNotificationHandler; @@ -126,8 +127,8 @@ public class ActiveStatsTest { requestedStats = eventsListener.getStats(); Assert.assertTrue(requestedStats.contains("N/A")); // Fake partition message and notify eventListener - ActivePartitionMessage partitionMessage = new ActivePartitionMessage(activeRuntimeId, jobId, - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null); + ActivePartitionMessage partitionMessage = + new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null); partitionMessage.handle(appCtx); start.sync(); if (start.hasFailed()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java index 3f68651..8d21b55 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java @@ -21,7 +21,7 @@ package org.apache.asterix.test.active; import org.apache.asterix.active.SingleThreadEventProcessor; import org.apache.asterix.metadata.declared.MetadataProvider; -class Actor extends SingleThreadEventProcessor<Action> { +public class Actor extends SingleThreadEventProcessor<Action> { private final MetadataProvider actorMdProvider; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java index e7e21b6..99499a3 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java @@ -23,6 +23,7 @@ import org.apache.asterix.active.ActiveEvent.Kind; import org.apache.asterix.active.ActiveRuntimeId; import org.apache.asterix.active.EntityId; import org.apache.asterix.active.message.ActivePartitionMessage; +import org.apache.asterix.active.message.ActivePartitionMessage.Event; import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.hyracks.api.job.JobId; @@ -41,9 +42,8 @@ public class TestNodeControllerActor extends Actor { Action registration = new Action() { @Override protected void doExecute(MetadataProvider actorMdProvider) throws Exception { - ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, - new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId, - ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null)); + ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage( + new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_REGISTERED, null)); clusterController.activeEvent(event); } }; @@ -55,9 +55,8 @@ public class TestNodeControllerActor extends Actor { Action registration = new Action() { @Override protected void doExecute(MetadataProvider actorMdProvider) throws Exception { - ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, - new ActivePartitionMessage(new ActiveRuntimeId(entityId, id, partition), jobId, - ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null)); + ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage( + new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null)); clusterController.activeEvent(event); } }; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java index 1c29e86..a96adc0 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java @@ -103,13 +103,13 @@ public class TestExecutor { // see // https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers/417184 private static final long MAX_URL_LENGTH = 2000l; - private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/", - Pattern.MULTILINE | Pattern.DOTALL); + private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = + Pattern.compile("/\\*.*\\*/", Pattern.MULTILINE | Pattern.DOTALL); private static final Pattern JAVA_LINE_COMMENT_PATTERN = Pattern.compile("^//.*$", Pattern.MULTILINE); private static final Pattern SHELL_LINE_COMMENT_PATTERN = Pattern.compile("^#.*$", Pattern.MULTILINE); private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$"); - private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", - Pattern.MULTILINE); + private static final Pattern POLL_TIMEOUT_PATTERN = + Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", Pattern.MULTILINE); private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE); private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)"); private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)"); @@ -168,10 +168,10 @@ public class TestExecutor { public void runScriptAndCompareWithResult(File scriptFile, PrintWriter print, File expectedFile, File actualFile, ComparisonEnum compare) throws Exception { System.err.println("Expected results file: " + expectedFile.toString()); - BufferedReader readerExpected = new BufferedReader( - new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); - BufferedReader readerActual = new BufferedReader( - new InputStreamReader(new FileInputStream(actualFile), "UTF-8")); + BufferedReader readerExpected = + new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); + BufferedReader readerActual = + new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8")); boolean regex = false; try { if (ComparisonEnum.BINARY.equals(compare)) { @@ -354,10 +354,10 @@ public class TestExecutor { public void runScriptAndCompareWithResultRegex(File scriptFile, File expectedFile, File actualFile) throws Exception { String lineExpected, lineActual; - try (BufferedReader readerExpected = new BufferedReader( - new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); - BufferedReader readerActual = new BufferedReader( - new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) { + try (BufferedReader readerExpected = + new BufferedReader(new InputStreamReader(new FileInputStream(expectedFile), "UTF-8")); + BufferedReader readerActual = + new BufferedReader(new InputStreamReader(new FileInputStream(actualFile), "UTF-8"))) { StringBuilder actual = new StringBuilder(); while ((lineActual = readerActual.readLine()) != null) { actual.append(lineActual).append('\n'); @@ -534,7 +534,7 @@ public class TestExecutor { return executeQueryService(str, fmt, uri, params, jsonEncoded, responseCodeValidator, false); } - protected InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params, + public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<Parameter> params, boolean jsonEncoded, Predicate<Integer> responseCodeValidator, boolean cancellable) throws Exception { final List<Parameter> newParams = upsertParam(params, "format", fmt.mimeType()); HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", newParams) @@ -697,8 +697,8 @@ public class TestExecutor { // Insert and Delete statements are executed here public void executeUpdate(String str, URI uri) throws Exception { // Create a method instance. - HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) - .build(); + HttpUriRequest request = + RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build(); // Execute the method. executeAndCheckHttpRequest(request); @@ -708,10 +708,10 @@ public class TestExecutor { public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri, Map<String, Object> variableCtx) throws Exception { // Create a method instance. - HttpUriRequest request = RequestBuilder.post(uri) - .addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous") - .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType()) - .build(); + HttpUriRequest request = + RequestBuilder.post(uri).addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous") + .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)) + .setHeader("Accept", fmt.mimeType()).build(); String handleVar = getHandleVariable(statement); @@ -737,8 +737,8 @@ public class TestExecutor { // create function statement public void executeDDL(String str, URI uri) throws Exception { // Create a method instance. - HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) - .build(); + HttpUriRequest request = + RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)).build(); // Execute the method. executeAndCheckHttpRequest(request); @@ -748,8 +748,8 @@ public class TestExecutor { // and returns the contents as a string // This string is later passed to REST API for execution. public String readTestFile(File testFile) throws Exception { - BufferedReader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8)); + BufferedReader reader = + new BufferedReader(new InputStreamReader(new FileInputStream(testFile), StandardCharsets.UTF_8)); String line; StringBuilder stringBuilder = new StringBuilder(); String ls = System.getProperty("line.separator"); @@ -804,8 +804,8 @@ public class TestExecutor { private static String getProcessOutput(Process p) throws Exception { ByteArrayOutputStream baos = new ByteArrayOutputStream(); - Future<Integer> future = Executors.newSingleThreadExecutor() - .submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() { + Future<Integer> future = + Executors.newSingleThreadExecutor().submit(() -> IOUtils.copy(p.getInputStream(), new OutputStream() { @Override public void write(int b) throws IOException { baos.write(b); @@ -1357,7 +1357,7 @@ public class TestExecutor { if (failedGroup != null) { failedGroup.getTestCase().add(testCaseCtx.getTestCase()); } - throw new Exception("Test \"" + testFile + "\" FAILED!"); + throw new Exception("Test \"" + testFile + "\" FAILED!", e); } } finally { if (numOfFiles == testFileCtxs.size()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm index d0b0ea0..0fcdf15 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/feed-stats/feed-stats.1.adm @@ -4,8 +4,7 @@ "adapter-stats" : { "incoming-records-count" : 13, "failed-at-parser-records-count" : 3 - }, - "executor-restart-times" : 0 + } } ] } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 3da58e9..8abbeab 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -438,14 +438,15 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC try { dsInfo.wait(); } catch (InterruptedException e) { - throw new HyracksDataException(e); + Thread.currentThread().interrupt(); + throw HyracksDataException.create(e); } } } try { flushDatasetOpenIndexes(dsInfo, false); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } for (IndexInfo iInfo : dsInfo.getIndexes().values()) { if (iInfo.isOpen()) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java index 94fe951..5aed5f2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java @@ -228,6 +228,7 @@ public class ErrorCode { public static final int ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED = 3107; public static final int FEED_STOPPED_WHILE_WAITING_FOR_A_NEW_RECORD = 3108; public static final int METADATA_DROP_FUCTION_IN_USE = 3109; + public static final int FEED_FAILED_WHILE_GETTING_A_NEW_RECORD = 3110; // Lifecycle management errors public static final int DUPLICATE_PARTITION_ID = 4000; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java index a74898e..a4c41df 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/Checkpoint.java @@ -120,7 +120,7 @@ public class Checkpoint implements Comparable<Checkpoint> { try { return new ObjectMapper().writeValueAsString(this); } catch (JsonProcessingException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } @@ -128,7 +128,7 @@ public class Checkpoint implements Comparable<Checkpoint> { try { return new ObjectMapper().readValue(json, Checkpoint.class); } catch (IOException e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java index 49f5457..229fb6d 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java @@ -18,9 +18,10 @@ */ package org.apache.asterix.common.transactions; +import java.util.concurrent.ExecutorService; + import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.api.ThreadExecutor; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; @@ -29,7 +30,7 @@ import org.apache.hyracks.storage.common.buffercache.IBufferCache; public interface IAppRuntimeContextProvider { - ThreadExecutor getThreadExecutor(); + ExecutorService getThreadExecutor(); IBufferCache getBufferCache(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java index 2e98abd..824e30b 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StoragePathUtil.java @@ -126,7 +126,7 @@ public class StoragePathUtil { } return file; } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 4cc06a6..a7a1990 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -217,6 +217,7 @@ 3107 = Active Notification Handler is already suspended 3108 = Feed stopped while waiting for a new record 3109 = Function %1$s is being used. It cannot be dropped. +3110 = Feed failed while reading a new record # Lifecycle management errors 4000 = Partition id %1$d for node %2$s already in use by node %3$s http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java index a31b46d..b51416a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/LookupAdapterFactory.java @@ -74,7 +74,7 @@ public class LookupAdapterFactory<T> implements Serializable { return new LookupAdapter<>(dataParser, reader, inRecDesc, ridReader, retainInput, retainMissing, isMissingWriterFactory, ctx, writer); } catch (Exception e) { - throw new HyracksDataException(e); + throw HyracksDataException.create(e); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java index def0bf1..7412338 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataFlowController.java @@ -18,27 +18,29 @@ */ package org.apache.asterix.external.api; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; +@FunctionalInterface public interface IDataFlowController { - //TODO: Refactor this interface. Remove writer from start() signature public void start(IFrameWriter writer) throws HyracksDataException, InterruptedException; public default boolean pause() throws HyracksDataException { - throw new HyracksDataException("Method not implemented"); + throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); } public default boolean resume() throws HyracksDataException { - throw new HyracksDataException("Method not implemented"); + throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); } public default void flush() throws HyracksDataException { - throw new HyracksDataException("Method not implemented"); + throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); } public default boolean stop() throws HyracksDataException { - throw new HyracksDataException("Method not implemented"); + throw new RuntimeDataException(ErrorCode.OPERATION_NOT_SUPPORTED); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java index e62672d..472cdae 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataSourceAdapter.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.external.api; -import java.io.Serializable; - import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -30,7 +28,7 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; * adapter(pull or push). */ @FunctionalInterface -public interface IDataSourceAdapter extends Serializable { +public interface IDataSourceAdapter { public enum AdapterType { INTERNAL, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/07075667/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index c87fe2d..53fa137 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -57,14 +57,5 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowControl tupleForwarder.flush(); } - @Override - public abstract boolean stop() throws HyracksDataException; - - public abstract boolean handleException(Throwable th) throws HyracksDataException; - public abstract String getStats(); - - public void fail() throws HyracksDataException { - tupleForwarder.fail(); - } }