abdullah alamoudi has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/1921
Change subject: [ASTERIXDB-2008][CLUS] Only add pending removal if node known
......................................................................
[ASTERIXDB-2008][CLUS] Only add pending removal if node known
[ASTERIXDB-2023][ING] Introduce Enums instead of using bytes
- user model changes: no
- storage format changes: no
- interface changes: no
details:
- Only nodes which are known to cluster manager are added
to the list of nodes pending removal. Other nodes are ignored
- Enums introduced:
- ActiveEvent.Kind
- ActivePartitionMessage.Event
Change-Id: I7044896559798426c04a3f46861bc5335b25d140
---
M asterixdb/asterix-active/pom.xml
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
M
asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
M asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
M
asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
M
hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
M
hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
M
hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
M
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
33 files changed, 255 insertions(+), 172 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/21/1921/1
diff --git a/asterixdb/asterix-active/pom.xml b/asterixdb/asterix-active/pom.xml
index 3dd24b6..6568795 100644
--- a/asterixdb/asterix-active/pom.xml
+++ b/asterixdb/asterix-active/pom.xml
@@ -31,10 +31,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </dependency>
- <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index fcf2be9..1dbacf5 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -26,6 +26,8 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActiveStatsResponse;
@@ -38,7 +40,6 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.util.JavaSerializationUtils;
import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.log4j.Logger;
public class ActiveManager {
@@ -86,15 +87,16 @@
}
public void submit(ActiveManagerMessage message) throws
HyracksDataException {
+ LOGGER.log(Level.INFO, "Message of type " + message.getKind() + "
received in " + nodeId);
switch (message.getKind()) {
- case ActiveManagerMessage.STOP_ACTIVITY:
+ case STOP_ACTIVITY:
stopRuntime(message);
break;
- case ActiveManagerMessage.REQUEST_STATS:
+ case REQUEST_STATS:
requestStats((StatsRequestMessage) message);
break;
default:
- LOGGER.warn("Unknown message type received: " +
message.getKind());
+ LOGGER.warning("Unknown message type received: " +
message.getKind());
}
}
@@ -104,7 +106,7 @@
IActiveRuntime runtime = runtimes.get(runtimeId);
long reqId = message.getReqId();
if (runtime == null) {
- LOGGER.warn("Request stats of a runtime that is not registered
" + runtimeId);
+ LOGGER.warning("Request stats of a runtime that is not
registered " + runtimeId);
// Send a failure message
((NodeControllerService) serviceCtx.getControllerService())
.sendApplicationMessageToCC(
@@ -124,7 +126,7 @@
}
public void shutdown() {
- LOGGER.warn("Shutting down ActiveManager on node " + nodeId);
+ LOGGER.warning("Shutting down ActiveManager on node " + nodeId);
Map<ActiveRuntimeId, Future<Void>> stopFutures = new HashMap<>();
shutdown = true;
runtimes.forEach((runtimeId, runtime) -> stopFutures.put(runtimeId,
executor.submit(() -> {
@@ -136,29 +138,29 @@
try {
entry.getValue().get(SHUTDOWN_TIMEOUT_SECS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
- LOGGER.warn("Interrupted waiting to stop runtime: " +
entry.getKey());
+ LOGGER.warning("Interrupted waiting to stop runtime: " +
entry.getKey());
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
- LOGGER.warn("Exception while stopping runtime: " +
entry.getKey(), e);
+ LOGGER.log(Level.WARNING, "Exception while stopping runtime: "
+ entry.getKey(), e);
} catch (TimeoutException e) {
- LOGGER.warn("Timed out waiting to stop runtime: " +
entry.getKey(), e);
+ LOGGER.log(Level.WARNING, "Timed out waiting to stop runtime:
" + entry.getKey(), e);
}
});
- LOGGER.warn("Shutdown ActiveManager on node " + nodeId + " complete");
+ LOGGER.warning("Shutdown ActiveManager on node " + nodeId + "
complete");
}
private void stopRuntime(ActiveManagerMessage message) {
ActiveRuntimeId runtimeId = (ActiveRuntimeId) message.getPayload();
IActiveRuntime runtime = runtimes.get(runtimeId);
if (runtime == null) {
- LOGGER.warn("Request to stop a runtime that is not registered " +
runtimeId);
+ LOGGER.warning("Request to stop a runtime that is not registered "
+ runtimeId);
} else {
executor.execute(() -> {
try {
stopIfRunning(runtimeId, runtime);
} catch (Exception e) {
// TODO(till) Figure out a better way to handle failure to
stop a runtime
- LOGGER.warn("Failed to stop runtime: " + runtimeId, e);
+ LOGGER.log(Level.WARNING, "Failed to stop runtime: " +
runtimeId, e);
}
});
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
index a7d7796..adef590 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java
@@ -22,6 +22,7 @@
import java.util.logging.Logger;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
@@ -89,7 +90,7 @@
try {
// notify cc that runtime has been registered
ctx.sendApplicationMessageToCC(new
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
- ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null),
null);
+ Event.RUNTIME_REGISTERED, null), null);
start();
} catch (InterruptedException e) {
LOGGER.log(Level.INFO, "initialize() interrupted on
ActiveSourceOperatorNodePushable", e);
@@ -112,7 +113,7 @@
activeManager.deregisterRuntime(runtimeId);
try {
ctx.sendApplicationMessageToCC(new
ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(),
- ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null),
null);
+ Event.RUNTIME_DEREGISTERED, null), null);
} catch (Exception e) {
LOGGER.log(Level.INFO, "deinitialize() failed on
ActiveSourceOperatorNodePushable", e);
throw HyracksDataException.create(e);
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
index 0a36216..6ce696a 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
@@ -27,6 +27,7 @@
import java.util.logging.Logger;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public abstract class SingleThreadEventProcessor<T> implements Runnable {
@@ -71,8 +72,8 @@
public void stop() throws HyracksDataException, InterruptedException {
future.cancel(true);
executorService.shutdown();
- if (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
- throw
HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+ if (!executorService.awaitTermination(60, TimeUnit.MINUTES)) {
+ throw new
RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
}
}
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
index 9772698..bef418b 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java
@@ -26,14 +26,16 @@
import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ActiveManagerMessage implements INcAddressedMessage {
- public static final byte STOP_ACTIVITY = 0x00;
- public static final byte REQUEST_STATS = 0x01;
+ public enum Kind {
+ STOP_ACTIVITY,
+ REQUEST_STATS
+ }
private static final long serialVersionUID = 1L;
- private final byte kind;
+ private final Kind kind;
private final Serializable payload;
- public ActiveManagerMessage(byte kind, Serializable payload) {
+ public ActiveManagerMessage(Kind kind, Serializable payload) {
this.kind = kind;
this.payload = payload;
}
@@ -42,7 +44,7 @@
return payload;
}
- public byte getKind() {
+ public Kind getKind() {
return kind;
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
index a47d5a5..9ace417 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java
@@ -29,17 +29,19 @@
import org.apache.hyracks.api.job.JobId;
public class ActivePartitionMessage implements ICcAddressedMessage {
+ public enum Event {
+ RUNTIME_REGISTERED,
+ RUNTIME_DEREGISTERED,
+ GENERIC_EVENT
+ }
private static final long serialVersionUID = 1L;
- public static final byte ACTIVE_RUNTIME_REGISTERED = 0x00;
- public static final byte ACTIVE_RUNTIME_DEREGISTERED = 0x01;
- public static final byte GENERIC_EVENT = 0x02;
private final ActiveRuntimeId activeRuntimeId;
private final JobId jobId;
private final Serializable payload;
- private final byte event;
+ private final Event event;
- public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId
jobId, byte event, Serializable payload) {
+ public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId
jobId, Event event, Serializable payload) {
this.activeRuntimeId = activeRuntimeId;
this.jobId = jobId;
this.event = event;
@@ -58,7 +60,7 @@
return payload;
}
- public byte getEvent() {
+ public Event getEvent() {
return event;
}
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
index 8fa5f19..d43f00e 100644
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
+++
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/StatsRequestMessage.java
@@ -24,8 +24,8 @@
private static final long serialVersionUID = 1L;
private final long reqId;
- public StatsRequestMessage(byte kind, Serializable payload, long reqId) {
- super(kind, payload);
+ public StatsRequestMessage(Serializable payload, long reqId) {
+ super(Kind.REQUEST_STATS, payload);
this.reqId = reqId;
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 995e372..493cc0a 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -38,8 +38,8 @@
import org.apache.asterix.active.IRetryPolicy;
import org.apache.asterix.active.IRetryPolicyFactory;
import org.apache.asterix.active.NoRetryPolicyFactory;
-import org.apache.asterix.active.message.ActiveManagerMessage;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.active.message.StatsRequestMessage;
import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
import org.apache.asterix.common.api.IMetadataLockManager;
@@ -68,6 +68,7 @@
public abstract class ActiveEntityEventsListener implements
IActiveEntityController {
private static final Logger LOGGER =
Logger.getLogger(ActiveEntityEventsListener.class.getName());
+ private static final Level level = Level.WARNING;
private static final ActiveEvent STATE_CHANGED = new ActiveEvent(null,
Kind.STATE_CHANGED, null, null);
private static final EnumSet<ActivityState> TRANSITION_STATES =
EnumSet.of(ActivityState.RESUMING,
ActivityState.STARTING, ActivityState.STOPPING,
ActivityState.RECOVERING);
@@ -130,7 +131,7 @@
}
protected synchronized void setState(ActivityState newState) {
- LOGGER.log(Level.FINE, "State is being set to " + newState + " from "
+ state);
+ LOGGER.log(level, "State of " + getEntityId() + "is being set to " +
newState + " from " + state);
this.prevState = state;
this.state = newState;
if (newState == ActivityState.SUSPENDED) {
@@ -142,7 +143,7 @@
@Override
public synchronized void notify(ActiveEvent event) {
try {
- LOGGER.fine("EventListener is notified.");
+ LOGGER.log(level, "EventListener is notified.");
ActiveEvent.Kind eventKind = event.getEventKind();
switch (eventKind) {
case JOB_CREATED:
@@ -172,22 +173,24 @@
}
protected synchronized void handle(ActivePartitionMessage message) {
- if (message.getEvent() ==
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED) {
+ if (message.getEvent() == Event.RUNTIME_REGISTERED) {
numRegistered++;
if (numRegistered == locations.getLocations().length) {
setState(ActivityState.RUNNING);
}
- } else if (message.getEvent() ==
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED) {
+ } else if (message.getEvent() == Event.RUNTIME_DEREGISTERED) {
numRegistered--;
}
}
@SuppressWarnings("unchecked")
protected void finish(ActiveEvent event) throws HyracksDataException {
+ LOGGER.log(level, "the job " + jobId + " finished");
jobId = null;
Pair<JobStatus, List<Exception>> status = (Pair<JobStatus,
List<Exception>>) event.getEventObject();
JobStatus jobStatus = status.getLeft();
List<Exception> exceptions = status.getRight();
+ LOGGER.log(level, "The job finished with status: " + jobStatus);
if (jobStatus.equals(JobStatus.FAILURE)) {
jobFailure = exceptions.isEmpty() ? new
RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
: exceptions.get(0);
@@ -271,10 +274,10 @@
@SuppressWarnings("unchecked")
@Override
public void refreshStats(long timeout) throws HyracksDataException {
- LOGGER.log(Level.FINE, "refreshStats called");
+ LOGGER.log(level, "refreshStats called");
synchronized (this) {
if (state != ActivityState.RUNNING || isFetchingStats) {
- LOGGER.log(Level.FINE,
+ LOGGER.log(level,
"returning immediately since state = " + state + " and
fetchingStats = " + isFetchingStats);
return;
} else {
@@ -287,8 +290,7 @@
List<INcAddressedMessage> requests = new ArrayList<>();
List<String> ncs = Arrays.asList(locations.getLocations());
for (int i = 0; i < ncs.size(); i++) {
- requests.add(new
StatsRequestMessage(ActiveManagerMessage.REQUEST_STATS,
- new ActiveRuntimeId(entityId, runtimeName, i), reqId));
+ requests.add(new StatsRequestMessage(new ActiveRuntimeId(entityId,
runtimeName, i), reqId));
}
try {
List<String> responses = (List<String>)
messageBroker.sendSyncRequestToNCs(reqId, ncs, requests, timeout);
@@ -348,32 +350,32 @@
@Override
public synchronized void recover() throws HyracksDataException {
- LOGGER.log(Level.FINE, "Recover is called on " + entityId);
+ LOGGER.log(level, "Recover is called on " + entityId);
if (recoveryTask != null) {
- LOGGER.log(Level.FINE, "But recovery task for " + entityId + " is
already there!! throwing an exception");
+ LOGGER.log(level, "But recovery task for " + entityId + " is
already there!! throwing an exception");
throw new RuntimeDataException(ErrorCode.DOUBLE_RECOVERY_ATTEMPTS);
}
if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
- LOGGER.log(Level.FINE, "But it has no recovery policy, so it is
set to permanent failure");
+ LOGGER.log(level, "But it has no recovery policy, so it is set to
permanent failure");
setState(ActivityState.PERMANENTLY_FAILED);
} else {
ExecutorService executor =
appCtx.getServiceContext().getControllerService().getExecutor();
IRetryPolicy policy = retryPolicyFactory.create(this);
cancelRecovery = false;
setState(ActivityState.TEMPORARILY_FAILED);
- LOGGER.log(Level.FINE, "Recovery task has been submitted");
+ LOGGER.log(level, "Recovery task has been submitted");
recoveryTask = executor.submit(() -> doRecover(policy));
}
}
protected Void doRecover(IRetryPolicy policy)
throws AlgebricksException, HyracksDataException,
InterruptedException {
- LOGGER.log(Level.FINE, "Actual Recovery task has started");
+ LOGGER.log(level, "Actual Recovery task has started");
if (getState() != ActivityState.TEMPORARILY_FAILED) {
- LOGGER.log(Level.FINE, "but its state is not temp failure and so
we're just returning");
+ LOGGER.log(level, "but its state is not temp failure and so we're
just returning");
return null;
}
- LOGGER.log(Level.FINE, "calling the policy");
+ LOGGER.log(level, "calling the policy");
while (policy.retry()) {
synchronized (this) {
if (cancelRecovery) {
@@ -402,7 +404,7 @@
doStart(metadataProvider);
return null;
} catch (Exception e) {
- LOGGER.log(Level.WARNING, "Attempt to revive " + entityId
+ " failed", e);
+ LOGGER.log(level, "Attempt to revive " + entityId + "
failed", e);
setState(ActivityState.TEMPORARILY_FAILED);
recoverFailure = e;
} finally {
@@ -515,10 +517,10 @@
WaitForStateSubscriber subscriber;
Future<Void> suspendTask;
synchronized (this) {
- LOGGER.log(Level.FINE, "suspending entity " + entityId);
- LOGGER.log(Level.FINE, "Waiting for ongoing activities");
+ LOGGER.log(level, "suspending entity " + entityId);
+ LOGGER.log(level, "Waiting for ongoing activities");
waitForNonTransitionState();
- LOGGER.log(Level.FINE, "Proceeding with suspension. Current state
is " + state);
+ LOGGER.log(level, "Proceeding with suspension. Current state is "
+ state);
if (state == ActivityState.STOPPED || state ==
ActivityState.PERMANENTLY_FAILED) {
suspended = true;
return;
@@ -536,12 +538,12 @@
EnumSet.of(ActivityState.SUSPENDED,
ActivityState.TEMPORARILY_FAILED));
suspendTask =
metadataProvider.getApplicationContext().getServiceContext().getControllerService()
.getExecutor().submit(() -> doSuspend(metadataProvider));
- LOGGER.log(Level.FINE, "Suspension task has been submitted");
+ LOGGER.log(level, "Suspension task has been submitted");
}
try {
- LOGGER.log(Level.FINE, "Waiting for suspension task to complete");
+ LOGGER.log(level, "Waiting for suspension task to complete");
suspendTask.get();
- LOGGER.log(Level.FINE, "waiting for state to become SUSPENDED or
TEMPORARILY_FAILED");
+ LOGGER.log(level, "waiting for state to become SUSPENDED or
TEMPORARILY_FAILED");
subscriber.sync();
} catch (Exception e) {
synchronized (this) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index c5e5dbb..5b576ac 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -53,7 +53,7 @@
implements IActiveNotificationHandler, IJobLifecycleListener {
private static final Logger LOGGER =
Logger.getLogger(ActiveNotificationHandler.class.getName());
- private static final boolean DEBUG = false;
+ private static final Level level = Level.WARNING;
public static final String ACTIVE_ENTITY_PROPERTY_NAME = "ActiveJob";
private final Map<EntityId, IActiveEntityEventsListener>
entityEventListeners;
private final Map<JobId, EntityId> jobId2EntityId;
@@ -73,13 +73,13 @@
EntityId entityId = jobId2EntityId.get(event.getJobId());
if (entityId != null) {
IActiveEntityEventsListener listener =
entityEventListeners.get(entityId);
- LOGGER.log(Level.FINE, "Next event is of type " +
event.getEventKind());
+ LOGGER.log(level, "Next event is of type " + event.getEventKind());
if (event.getEventKind() == Kind.JOB_FINISHED) {
- LOGGER.log(Level.FINE, "Removing the job");
+ LOGGER.log(level, "Removing the job");
jobId2EntityId.remove(event.getJobId());
}
if (listener != null) {
- LOGGER.log(Level.FINE, "Notifying the listener");
+ LOGGER.log(level, "Notifying the listener");
listener.notify(event);
}
} else {
@@ -91,34 +91,30 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification
jobSpecification) throws HyracksDataException {
- LOGGER.log(Level.FINE,
+ LOGGER.log(level,
"notifyJobCreation(JobId jobId, JobSpecification
jobSpecification) was called with jobId = " + jobId);
Object property =
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (property == null || !(property instanceof EntityId)) {
- LOGGER.log(Level.FINE, "Job is not of type active job. property
found to be: " + property);
+ LOGGER.log(level, "Job is not of type active job. property found
to be: " + property);
return;
}
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
boolean found = jobId2EntityId.get(jobId) != null;
- LOGGER.log(Level.FINE, "Job was found to be: " + (found ? "Active" :
"Inactive"));
+ LOGGER.log(level, "Job was found to be: " + (found ? "Active" :
"Inactive"));
add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId,
jobSpecification));
}
private synchronized void monitorJob(JobId jobId, EntityId entityId) {
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "monitorJob(JobId jobId, ActiveJob
activeJob) called with job id: " + jobId);
- boolean found = jobId2EntityId.get(jobId) != null;
- LOGGER.log(Level.WARNING, "Job was found to be: " + (found ?
"Active" : "Inactive"));
- }
+ LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called
with job id: " + jobId);
+ boolean found = jobId2EntityId.get(jobId) != null;
+ LOGGER.log(level, "Job was found to be: " + (found ? "Active" :
"Inactive"));
if (entityEventListeners.containsKey(entityId)) {
if (jobId2EntityId.containsKey(jobId)) {
LOGGER.severe("Job is already being monitored for job: " +
jobId);
return;
}
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "monitoring started for job id: " +
jobId);
- }
+ LOGGER.log(level, "monitoring started for job id: " + jobId);
} else {
LOGGER.info("No listener was found for the entity: " + entityId);
}
@@ -140,9 +136,7 @@
if (entityId != null) {
add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId,
Pair.of(jobStatus, exceptions)));
} else {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.info("NO NEED TO NOTIFY JOB FINISH!");
- }
+ LOGGER.log(level, "NO NEED TO NOTIFY JOB FINISH!");
}
}
@@ -156,20 +150,16 @@
@Override
public IActiveEntityEventsListener getListener(EntityId entityId) {
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "getActiveEntityListener(EntityId
entityId) was called with entity " + entityId);
- IActiveEntityEventsListener listener =
entityEventListeners.get(entityId);
- LOGGER.log(Level.WARNING, "Listener found: " + listener);
- }
+ LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was
called with entity " + entityId);
+ IActiveEntityEventsListener listener =
entityEventListeners.get(entityId);
+ LOGGER.log(level, "Listener found: " + listener);
return entityEventListeners.get(entityId);
}
@Override
public synchronized IActiveEntityEventsListener[] getEventListeners() {
- if (DEBUG) {
- LOGGER.log(Level.WARNING, "getEventListeners() was called");
- LOGGER.log(Level.WARNING, "returning " +
entityEventListeners.size() + " Listeners");
- }
+ LOGGER.log(level, "getEventListeners() was called");
+ LOGGER.log(level, "returning " + entityEventListeners.size() + "
Listeners");
return entityEventListeners.values().toArray(new
IActiveEntityEventsListener[entityEventListeners.size()]);
}
@@ -178,11 +168,8 @@
if (suspended) {
throw new
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- if (DEBUG) {
- LOGGER.log(Level.WARNING,
- "registerListener(IActiveEntityEventsListener listener)
was called for the entity "
- + listener.getEntityId());
- }
+ LOGGER.log(level, "registerListener(IActiveEntityEventsListener
listener) was called for the entity "
+ + listener.getEntityId());
if (entityEventListeners.containsKey(listener.getEntityId())) {
throw new
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED,
listener.getEntityId());
}
@@ -194,7 +181,7 @@
if (suspended) {
throw new
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
}
- LOGGER.log(Level.FINE, "unregisterListener(IActiveEntityEventsListener
listener) was called for the entity "
+ LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener
listener) was called for the entity "
+ listener.getEntityId());
IActiveEntityEventsListener registeredListener =
entityEventListeners.remove(listener.getEntityId());
if (registeredListener == null) {
@@ -221,16 +208,16 @@
@Override
public synchronized void recover() throws HyracksDataException {
- LOGGER.log(Level.FINE, "Starting active recovery");
+ LOGGER.log(level, "Starting active recovery");
for (IActiveEntityEventsListener listener :
entityEventListeners.values()) {
synchronized (listener) {
- LOGGER.log(Level.FINE, "Entity " + listener.getEntityId() + "
is " + listener.getStats());
+ LOGGER.log(level, "Entity " + listener.getEntityId() + " is "
+ listener.getStats());
if (listener.getState() == ActivityState.PERMANENTLY_FAILED
&& listener instanceof IActiveEntityController) {
- LOGGER.log(Level.FINE, "Recovering");
+ LOGGER.log(level, "Recovering");
((IActiveEntityController) listener).recover();
} else {
- LOGGER.log(Level.FINE, "Only notifying");
+ LOGGER.log(level, "Only notifying");
listener.notifyAll();
}
}
@@ -243,7 +230,7 @@
if (suspended) {
throw new
RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED);
}
- LOGGER.log(Level.FINE, "Suspending active events handler");
+ LOGGER.log(level, "Suspending active events handler");
suspended = true;
}
IMetadataLockManager lockManager =
mdProvider.getApplicationContext().getMetadataLockManager();
@@ -253,27 +240,27 @@
// exclusive lock all the datasets
String dataverseName = listener.getEntityId().getDataverse();
String entityName = listener.getEntityId().getEntityName();
- LOGGER.log(Level.FINE, "Suspending " + listener.getEntityId());
- LOGGER.log(Level.FINE, "Acquiring locks");
+ LOGGER.log(level, "Suspending " + listener.getEntityId());
+ LOGGER.log(level, "Acquiring locks");
lockManager.acquireActiveEntityWriteLock(mdProvider.getLocks(),
dataverseName + '.' + entityName);
List<Dataset> datasets = ((ActiveEntityEventsListener)
listener).getDatasets();
for (Dataset dataset : datasets) {
lockManager.acquireDatasetExclusiveModificationLock(mdProvider.getLocks(),
DatasetUtil.getFullyQualifiedName(dataset));
}
- LOGGER.log(Level.FINE, "locks acquired");
+ LOGGER.log(level, "locks acquired");
((ActiveEntityEventsListener) listener).suspend(mdProvider);
- LOGGER.log(Level.FINE, listener.getEntityId() + " suspended");
+ LOGGER.log(level, listener.getEntityId() + " suspended");
}
}
public void resume(MetadataProvider mdProvider)
throws AsterixException, HyracksDataException,
InterruptedException {
- LOGGER.log(Level.FINE, "Resuming active events handler");
+ LOGGER.log(level, "Resuming active events handler");
for (IActiveEntityEventsListener listener :
entityEventListeners.values()) {
- LOGGER.log(Level.FINE, "Resuming " + listener.getEntityId());
+ LOGGER.log(level, "Resuming " + listener.getEntityId());
((ActiveEntityEventsListener) listener).resume(mdProvider);
- LOGGER.log(Level.FINE, listener.getEntityId() + " resumed");
+ LOGGER.log(level, listener.getEntityId() + " resumed");
}
synchronized (this) {
suspended = false;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 09c4983..5503940 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -31,6 +31,7 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.message.ActiveManagerMessage;
+import org.apache.asterix.active.message.ActiveManagerMessage.Kind;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
@@ -272,8 +273,8 @@
}
// make connections between operators
- for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>,
- Pair<IOperatorDescriptor, Integer>>> entry :
subJob.getConnectorOperatorMap().entrySet()) {
+ for (Entry<ConnectorDescriptorId, Pair<Pair<IOperatorDescriptor,
Integer>, Pair<IOperatorDescriptor, Integer>>> entry : subJob
+ .getConnectorOperatorMap().entrySet()) {
ConnectorDescriptorId newId =
connectorIdMapping.get(entry.getKey());
IConnectorDescriptor connDesc =
jobSpec.getConnectorMap().get(newId);
Pair<IOperatorDescriptor, Integer> leftOp =
entry.getValue().getLeft();
@@ -381,7 +382,7 @@
public static void SendStopMessageToNode(ICcApplicationContext appCtx,
EntityId feedId, String intakeNodeLocation,
Integer partition) throws Exception {
- ActiveManagerMessage stopFeedMessage = new
ActiveManagerMessage(ActiveManagerMessage.STOP_ACTIVITY,
+ ActiveManagerMessage stopFeedMessage = new
ActiveManagerMessage(Kind.STOP_ACTIVITY,
new ActiveRuntimeId(feedId,
FeedIntakeOperatorNodePushable.class.getSimpleName(), partition));
SendActiveMessage(appCtx, stopFeedMessage, intakeNodeLocation);
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
index 71cb038..74c4364 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Action.java
@@ -21,7 +21,7 @@
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-abstract class Action {
+public abstract class Action {
boolean done = false;
HyracksDataException failure;
@@ -39,21 +39,21 @@
protected abstract void doExecute(MetadataProvider mdProvider) throws
Exception;
- boolean hasFailed() {
+ public boolean hasFailed() {
return failure != null;
}
- HyracksDataException getFailure() {
+ public HyracksDataException getFailure() {
return failure;
}
- synchronized void sync() throws InterruptedException {
+ public synchronized void sync() throws InterruptedException {
while (!done) {
wait();
}
}
- boolean isDone() {
+ public boolean isDone() {
return done;
}
}
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
index f8baa0e..e1fdb69 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java
@@ -30,6 +30,7 @@
import org.apache.asterix.active.IActiveRuntime;
import org.apache.asterix.active.NoRetryPolicyFactory;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.algebra.base.ILangExtension.Language;
import org.apache.asterix.app.active.ActiveEntityEventsListener;
import org.apache.asterix.app.active.ActiveNotificationHandler;
@@ -126,8 +127,8 @@
requestedStats = eventsListener.getStats();
Assert.assertTrue(requestedStats.contains("N/A"));
// Fake partition message and notify eventListener
- ActivePartitionMessage partitionMessage = new
ActivePartitionMessage(activeRuntimeId, jobId,
- ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null);
+ ActivePartitionMessage partitionMessage =
+ new ActivePartitionMessage(activeRuntimeId, jobId,
Event.RUNTIME_REGISTERED, null);
partitionMessage.handle(appCtx);
start.sync();
if (start.hasFailed()) {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 3f68651..8d21b55 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -21,7 +21,7 @@
import org.apache.asterix.active.SingleThreadEventProcessor;
import org.apache.asterix.metadata.declared.MetadataProvider;
-class Actor extends SingleThreadEventProcessor<Action> {
+public class Actor extends SingleThreadEventProcessor<Action> {
private final MetadataProvider actorMdProvider;
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
index e7e21b6..99499a3 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java
@@ -23,6 +23,7 @@
import org.apache.asterix.active.ActiveRuntimeId;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.message.ActivePartitionMessage;
+import org.apache.asterix.active.message.ActivePartitionMessage.Event;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.hyracks.api.job.JobId;
@@ -41,9 +42,8 @@
Action registration = new Action() {
@Override
protected void doExecute(MetadataProvider actorMdProvider) throws
Exception {
- ActiveEvent event = new ActiveEvent(jobId,
Kind.PARTITION_EVENT, entityId,
- new ActivePartitionMessage(new
ActiveRuntimeId(entityId, id, partition), jobId,
-
ActivePartitionMessage.ACTIVE_RUNTIME_REGISTERED, null));
+ ActiveEvent event = new ActiveEvent(jobId,
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+ new ActiveRuntimeId(entityId, id, partition), jobId,
Event.RUNTIME_REGISTERED, null));
clusterController.activeEvent(event);
}
};
@@ -55,9 +55,8 @@
Action registration = new Action() {
@Override
protected void doExecute(MetadataProvider actorMdProvider) throws
Exception {
- ActiveEvent event = new ActiveEvent(jobId,
Kind.PARTITION_EVENT, entityId,
- new ActivePartitionMessage(new
ActiveRuntimeId(entityId, id, partition), jobId,
-
ActivePartitionMessage.ACTIVE_RUNTIME_DEREGISTERED, null));
+ ActiveEvent event = new ActiveEvent(jobId,
Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage(
+ new ActiveRuntimeId(entityId, id, partition), jobId,
Event.RUNTIME_DEREGISTERED, null));
clusterController.activeEvent(event);
}
};
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index 10b528f..b383317 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -534,7 +534,7 @@
return executeQueryService(str, fmt, uri, params, jsonEncoded,
responseCodeValidator, false);
}
- protected InputStream executeQueryService(String str, OutputFormat fmt,
URI uri,
+ public InputStream executeQueryService(String str, OutputFormat fmt, URI
uri,
List<CompilationUnit.Parameter> params, boolean jsonEncoded,
Predicate<Integer> responseCodeValidator,
boolean cancellable) throws Exception {
final List<CompilationUnit.Parameter> newParams = upsertParam(params,
"format", fmt.mimeType());
@@ -1326,7 +1326,7 @@
if (failedGroup != null) {
failedGroup.getTestCase().add(testCaseCtx.getTestCase());
}
- throw new Exception("Test \"" + testFile + "\"
FAILED!");
+ throw new Exception("Test \"" + testFile + "\"
FAILED!", e);
}
} finally {
if (numOfFiles == testFileCtxs.size()) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index 5b9b96f..d709845 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -193,6 +193,7 @@
try {
waitForSignal();
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw HyracksDataException.create(e);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
index 1b5eeac..f423404 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/feed/runtime/AdapterRuntimeManager.java
@@ -74,29 +74,31 @@
public void stop() throws HyracksDataException, InterruptedException {
synchronized (adapterExecutor) {
- try {
- if (started) {
- try {
- ctx.getExecutorService().submit(() -> {
- if (feedAdapter.stop()) {
- execution.get();
- }
- return null;
- }).get(30, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.log(Level.WARNING, "Interrupted while trying to
stop an adapter runtime", e);
- throw e;
- } catch (Exception e) {
- LOGGER.log(Level.WARNING, "Exception while trying to
stop an adapter runtime", e);
- throw HyracksDataException.create(e);
- } finally {
- execution.cancel(true);
+ if (!done) {
+ try {
+ if (started) {
+ try {
+ ctx.getExecutorService().submit(() -> {
+ if (feedAdapter.stop()) {
+ execution.get();
+ }
+ return null;
+ }).get(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOGGER.log(Level.WARNING, "Interrupted while
trying to stop an adapter runtime", e);
+ throw e;
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, "Exception while trying
to stop an adapter runtime", e);
+ throw HyracksDataException.create(e);
+ } finally {
+ execution.cancel(true);
+ }
+ } else {
+ LOGGER.log(Level.WARNING, "Adapter executor was
stopped before it starts");
}
- } else {
- LOGGER.log(Level.WARNING, "Adapter executor was stopped
before it starts");
+ } finally {
+ done = true;
}
- } finally {
- done = true;
}
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index 4717a7b..3b23d67 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -389,10 +389,11 @@
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Registering intention to remove node id " + nodeId);
}
- if (!activeNcConfiguration.containsKey(nodeId)) {
+ if (activeNcConfiguration.containsKey(nodeId)) {
+ pendingRemoval.add(nodeId);
+ } else {
LOGGER.warning("Cannot register unknown node " + nodeId + " for
pending removal");
}
- pendingRemoval.add(nodeId);
}
public synchronized boolean cancelRemovePending(String nodeId) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
index 8394057..99d9f3d 100644
---
a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
+++
b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/constraints/AlgebricksAbsolutePartitionConstraint.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.algebricks.common.constraints;
+import java.util.Arrays;
+
public class AlgebricksAbsolutePartitionConstraint extends
AlgebricksPartitionConstraint {
private final String[] locations;
@@ -33,4 +35,10 @@
public String[] getLocations() {
return locations;
}
+
+ @Override
+ public String toString() {
+ return Arrays.deepToString(locations);
+ }
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
index c9cc71e..2c1ce37 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/test/java/org/apache/hyracks/api/test/FrameWriterTestUtils.java
@@ -19,6 +19,7 @@
package org.apache.hyracks.api.test;
import java.util.Collection;
+import java.util.Collections;
public class FrameWriterTestUtils {
public static final String EXCEPTION_MESSAGE = "IFrameWriter Exception in
the call to the method ";
@@ -32,6 +33,10 @@
Close
}
+ public static TestFrameWriter create() {
+ return create(Collections.emptyList(), Collections.emptyList(), false);
+ }
+
public static TestFrameWriter create(Collection<FrameWriterOperation>
exceptionThrowingOperations,
Collection<FrameWriterOperation> errorThrowingOperations, boolean
deepCopyInputFrames) {
CountAnswer openAnswer =
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
index 2685f60..7a9306c 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java
@@ -151,12 +151,18 @@
@Override
public synchronized void reportJobFailure(JobId jobId, List<Exception>
exceptions) {
+ LOGGER.log(Level.INFO, "job " + jobId + " failed and is being reported
to " + getClass().getSimpleName(),
+ exceptions.get(0));
DatasetJobRecord djr = getDatasetJobRecord(jobId);
+ LOGGER.log(Level.INFO, "Dataset job record is " + djr);
if (djr != null) {
+ LOGGER.log(Level.INFO, "Setting exceptions in Dataset job record");
djr.fail(exceptions);
}
final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
+ LOGGER.log(Level.INFO, "Job result info is " + jobResultInfo);
if (jobResultInfo != null) {
+ LOGGER.log(Level.INFO, "Setting exceptions in Job result info");
jobResultInfo.setException(exceptions.isEmpty() ? null :
exceptions.get(0));
}
notifyAll();
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index f18a917..dbbaf9f 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -66,7 +66,6 @@
import org.apache.hyracks.control.common.job.PartitionState;
import org.apache.hyracks.control.common.job.TaskAttemptDescriptor;
-
public class JobExecutor {
private static final Logger LOGGER =
Logger.getLogger(JobExecutor.class.getName());
@@ -190,11 +189,11 @@
private void startRunnableActivityClusters() throws HyracksException {
Set<TaskCluster> taskClusterRoots = new HashSet<>();
- findRunnableTaskClusterRoots(taskClusterRoots,
jobRun.getActivityClusterGraph().getActivityClusterMap()
- .values());
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Runnable TC roots: " + taskClusterRoots + ",
inProgressTaskClusters: "
- + inProgressTaskClusters);
+ findRunnableTaskClusterRoots(taskClusterRoots,
+
jobRun.getActivityClusterGraph().getActivityClusterMap().values());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO,
+ "Runnable TC roots: " + taskClusterRoots + ",
inProgressTaskClusters: " + inProgressTaskClusters);
}
if (taskClusterRoots.isEmpty() && inProgressTaskClusters.isEmpty()) {
ccs.getWorkQueue()
@@ -344,8 +343,8 @@
for (int i = 0; i < tasks.length; ++i) {
Task ts = tasks[i];
TaskId tid = ts.getTaskId();
- TaskAttempt taskAttempt = new TaskAttempt(tcAttempt, new
TaskAttemptId(new TaskId(tid.getActivityId(),
- tid.getPartition()), attempts), ts);
+ TaskAttempt taskAttempt = new TaskAttempt(tcAttempt,
+ new TaskAttemptId(new TaskId(tid.getActivityId(),
tid.getPartition()), attempts), ts);
taskAttempt.setStatus(TaskAttempt.TaskStatus.INITIALIZED, null);
locationMap.put(tid,
new
PartitionLocationExpression(tid.getActivityId().getOperatorDescriptorId(),
tid.getPartition()));
@@ -496,8 +495,8 @@
final DeploymentId deploymentId = jobRun.getDeploymentId();
final JobId jobId = jobRun.getJobId();
final ActivityClusterGraph acg = jobRun.getActivityClusterGraph();
- final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
new HashMap<>(
- jobRun.getConnectorPolicyMap());
+ final Map<ConnectorDescriptorId, IConnectorPolicy> connectorPolicies =
+ new HashMap<>(jobRun.getConnectorPolicyMap());
INodeManager nodeManager = ccs.getNodeManager();
try {
byte[] acgBytes = predistributed ? null :
JavaSerializationUtils.serialize(acg);
@@ -555,14 +554,14 @@
}
}
final JobId jobId = jobRun.getJobId();
- LOGGER.fine("Abort map for job: " + jobId + ": " +
abortTaskAttemptMap);
+ LOGGER.info("Abort map for job: " + jobId + ": " +
abortTaskAttemptMap);
INodeManager nodeManager = ccs.getNodeManager();
for (Map.Entry<String, List<TaskAttemptId>> entry :
abortTaskAttemptMap.entrySet()) {
final NodeControllerState node =
nodeManager.getNodeControllerState(entry.getKey());
final List<TaskAttemptId> abortTaskAttempts = entry.getValue();
if (node != null) {
- if (LOGGER.isLoggable(Level.FINE)) {
- LOGGER.fine("Aborting: " + abortTaskAttempts + " at " +
entry.getKey());
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.info("Aborting: " + abortTaskAttempts + " at " +
entry.getKey());
}
try {
node.getNodeController().abortTasks(jobId,
abortTaskAttempts);
@@ -582,6 +581,7 @@
}
private void abortDoomedTaskClusters() throws HyracksException {
+ LOGGER.log(Level.INFO, "aborting doomed task clusters");
Set<TaskCluster> doomedTaskClusters = new HashSet<>();
for (TaskCluster tc : inProgressTaskClusters) {
// Start search at TCs that produce no outputs (sinks)
@@ -590,6 +590,7 @@
}
}
+ LOGGER.log(Level.INFO, "number of doomed task clusters found = " +
doomedTaskClusters.size());
for (TaskCluster tc : doomedTaskClusters) {
TaskClusterAttempt tca = findLastTaskClusterAttempt(tc);
if (tca != null) {
@@ -628,7 +629,7 @@
if ((maxState == null
|| (cPolicy.consumerWaitsForProducerToFinish() && maxState
!= PartitionState.COMMITTED))
&&
findDoomedTaskClusters(partitionProducingTaskClusterMap.get(pid),
doomedTaskClusters)) {
- doomed = true;
+ doomed = true;
}
}
if (doomed) {
@@ -663,28 +664,36 @@
/**
* Indicates that a single task attempt has encountered a failure.
- * @param ta Failed Task Attempt
- * @param exceptions exeptions thrown during the failure
+ *
+ * @param ta
+ * Failed Task Attempt
+ * @param exceptions
+ * exeptions thrown during the failure
*/
public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
try {
- LOGGER.fine("Received failure notification for TaskAttempt " +
ta.getTaskAttemptId());
+ LOGGER.log(Level.INFO, "Received failure notification for
TaskAttempt " + ta.getTaskAttemptId());
TaskAttemptId taId = ta.getTaskAttemptId();
TaskCluster tc = ta.getTask().getTaskCluster();
TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
if (lastAttempt != null && taId.getAttempt() ==
lastAttempt.getAttempt()) {
- LOGGER.fine("Marking TaskAttempt " + ta.getTaskAttemptId() + "
as failed");
+ LOGGER.log(Level.INFO, "Marking TaskAttempt " +
ta.getTaskAttemptId() + " as failed");
ta.setStatus(TaskAttempt.TaskStatus.FAILED, exceptions);
abortTaskCluster(lastAttempt,
TaskClusterAttempt.TaskClusterStatus.FAILED);
abortDoomedTaskClusters();
- if (lastAttempt.getAttempt() >=
jobRun.getActivityClusterGraph().getMaxReattempts() || isCancelled()) {
+ int maxReattempts =
jobRun.getActivityClusterGraph().getMaxReattempts();
+ LOGGER.log(Level.INFO, "Marking TaskAttempt " +
ta.getTaskAttemptId()
+ + " as failed and the number of max re-attempts = " +
maxReattempts);
+ if (lastAttempt.getAttempt() >= maxReattempts ||
isCancelled()) {
+ LOGGER.log(Level.INFO, "Aborting the job of " +
ta.getTaskAttemptId());
abortJob(exceptions);
return;
}
+ LOGGER.log(Level.INFO, "We will try to start runnable activity
clusters of " + ta.getTaskAttemptId());
startRunnableActivityClusters();
} else {
- LOGGER.warning("Ignoring task failure notification: " + taId +
" -- Current last attempt = "
- + lastAttempt);
+ LOGGER.warning(
+ "Ignoring task failure notification: " + taId + " --
Current last attempt = " + lastAttempt);
}
} catch (Exception e) {
abortJob(Collections.singletonList(e));
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 486e9c6..8f50087 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.control.cc.work;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.dataflow.TaskAttemptId;
import org.apache.hyracks.api.job.JobId;
@@ -28,6 +30,7 @@
import org.apache.hyracks.control.cc.job.TaskAttempt;
public class TaskFailureWork extends AbstractTaskLifecycleWork {
+ private static final Logger LOGGER =
Logger.getLogger(TaskFailureWork.class.getName());
private final List<Exception> exceptions;
public TaskFailureWork(ClusterControllerService ccs, JobId jobId,
TaskAttemptId taId, String nodeId,
@@ -38,6 +41,7 @@
@Override
protected void performEvent(TaskAttempt ta) {
+ LOGGER.log(Level.WARNING, "Executing task failure work for " + this,
exceptions.get(0));
IJobManager jobManager = ccs.getJobManager();
JobRun run = jobManager.get(jobId);
ccs.getDatasetDirectoryService().reportJobFailure(jobId, exceptions);
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 7f5302a..4f5b556 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -28,9 +28,6 @@
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.application.NCServiceContext;
-/**
- * @author rico
- */
public class ApplicationMessageWork extends AbstractWork {
private static final Logger LOGGER =
Logger.getLogger(ApplicationMessageWork.class.getName());
private byte[] message;
@@ -63,6 +60,6 @@
@Override
public String toString() {
- return getName() + ": nodeID: " + nodeId;
+ return getName() + ": nodeId: " + nodeId;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index f4ee6b0..7728d16 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -18,12 +18,16 @@
*/
package org.apache.hyracks.control.nc.work;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.hyracks.control.nc.Task;
public class NotifyTaskCompleteWork extends AbstractWork {
+ private static final Logger LOGGER =
Logger.getLogger(NotifyTaskCompleteWork.class.getName());
private final NodeControllerService ncs;
private final Task task;
@@ -40,8 +44,13 @@
ncs.getClusterController().notifyTaskComplete(task.getJobletContext().getJobId(),
task.getTaskAttemptId(),
ncs.getId(), taskProfile);
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.log(Level.SEVERE, "Failed notifying task complete for " +
task.getTaskAttemptId(), e);
}
task.getJoblet().removeTask(task);
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + task.getTaskAttemptId();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index fa8ba28..7ed2c09 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -35,7 +35,6 @@
private final Task task;
private final JobId jobId;
private final TaskAttemptId taskId;
-
private final List<Exception> exceptions;
public NotifyTaskFailureWork(NodeControllerService ncs, Task task,
List<Exception> exceptions, JobId jobId,
@@ -49,6 +48,8 @@
@Override
public void run() {
+ LOGGER.log(Level.WARNING, ncs.getId() + " is sending a notification to
cc that task " + taskId + " has failed",
+ exceptions.get(0));
try {
IDatasetPartitionManager dpm = ncs.getDatasetPartitionManager();
if (dpm != null) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
index d9ab210..dea48bd 100644
---
a/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
+++
b/hyracks-fullstack/hyracks/hyracks-ipc/src/main/java/org/apache/hyracks/ipc/impl/IPCSystem.java
@@ -74,6 +74,7 @@
} catch (IOException e) {
throw new IPCException(e);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IPCException(e);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
index a19e69a..157450a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/MutableArrayValueReference.java
@@ -18,6 +18,8 @@
*/
package org.apache.hyracks.storage.am.common.freepage;
+import java.nio.charset.StandardCharsets;
+
import org.apache.hyracks.data.std.api.IValueReference;
public class MutableArrayValueReference implements IValueReference {
@@ -46,4 +48,9 @@
return array == null ? 0 : array.length;
}
+ @Override
+ public String toString() {
+ return new String(array, StandardCharsets.UTF_8);
+ }
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
index eb8ec92..33bb60e 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeDiskComponent.java
@@ -63,4 +63,9 @@
public int getFileReferenceCount() {
return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" +
btree.getFileReference().getRelativePath();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
index 57b9092..0ba7c30 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTreeWithBuddyDiskComponent.java
@@ -74,4 +74,8 @@
return btree.getBufferCache().getFileReferenceCount(btree.getFileId());
}
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" +
btree.getFileReference().getRelativePath();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
index 6ccbc8d..40017d1 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/utils/ComponentMetadataUtil.java
@@ -19,6 +19,8 @@
package org.apache.hyracks.storage.am.lsm.common.utils;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.api.IPointable;
@@ -32,8 +34,8 @@
public class ComponentMetadataUtil {
- public static final MutableArrayValueReference MARKER_LSN_KEY =
- new MutableArrayValueReference("Marker".getBytes());
+ private static final Logger LOGGER =
Logger.getLogger(ComponentMetadataUtil.class.getName());
+ public static final MutableArrayValueReference MARKER_LSN_KEY = new
MutableArrayValueReference("Marker".getBytes());
public static final long NOT_FOUND = -1L;
private ComponentMetadataUtil() {
@@ -71,16 +73,28 @@
* @throws HyracksDataException
*/
public static void get(ILSMIndex index, IValueReference key, IPointable
pointable) throws HyracksDataException {
+ LOGGER.log(Level.INFO, "Getting " + key + " from index " + index);
// Lock the opTracker to ensure index components don't change
synchronized (index.getOperationTracker()) {
index.getCurrentMemoryComponent().getMetadata().get(key,
pointable);
if (pointable.getLength() == 0) {
+ LOGGER.log(Level.INFO, key + " was not found in mutable memory
component of " + index);
// was not found in the in current mutable component, search
in the other in memory components
fromImmutableMemoryComponents(index, key, pointable);
if (pointable.getLength() == 0) {
+ LOGGER.log(Level.INFO, key + " was not found in all
immmutable memory components of " + index);
// was not found in the in all in memory components,
search in the disk components
fromDiskComponents(index, key, pointable);
+ if (pointable.getLength() == 0) {
+ LOGGER.log(Level.INFO, key + " was not found in all
disk components of " + index);
+ } else {
+ LOGGER.log(Level.INFO, key + " was found in disk
components of " + index);
+ }
+ } else {
+ LOGGER.log(Level.INFO, key + " was found in the immutable
memory components of " + index);
}
+ } else {
+ LOGGER.log(Level.INFO, key + " was found in mutable memory
component of " + index);
}
}
}
@@ -105,7 +119,9 @@
private static void fromDiskComponents(ILSMIndex index, IValueReference
key, IPointable pointable)
throws HyracksDataException {
+ LOGGER.log(Level.INFO, "Getting " + key + " from disk components of "
+ index);
for (ILSMDiskComponent c : index.getImmutableComponents()) {
+ LOGGER.log(Level.INFO, "Getting " + key + " from disk components "
+ c);
c.getMetadata().get(key, pointable);
if (pointable.getLength() != 0) {
// Found
@@ -115,10 +131,13 @@
}
private static void fromImmutableMemoryComponents(ILSMIndex index,
IValueReference key, IPointable pointable) {
+ LOGGER.log(Level.INFO, "Getting " + key + " from immutable memory
components of " + index);
List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
int numOtherMemComponents = memComponents.size() - 1;
int next = index.getCurrentMemoryComponentIndex();
+ LOGGER.log(Level.INFO, index + " has " + numOtherMemComponents + "
immutable memory components");
for (int i = 0; i < numOtherMemComponents; i++) {
+ LOGGER.log(Level.INFO, "trying to get " + key + " from immutable
memory components number: " + (i + 1));
next = next - 1;
if (next < 0) {
next = memComponents.size() - 1;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
index f2b3284..2470a39 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/impls/LSMInvertedIndexDiskComponent.java
@@ -75,4 +75,9 @@
public int getFileReferenceCount() {
return
deletedKeysBTree.getBufferCache().getFileReferenceCount(deletedKeysBTree.getFileId());
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" + ((OnDiskInvertedIndex)
invIndex).getInvListsFile().getRelativePath();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
index 982f89b..54ef122 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-rtree/src/main/java/org/apache/hyracks/storage/am/lsm/rtree/impls/LSMRTreeDiskComponent.java
@@ -76,4 +76,9 @@
public int getFileReferenceCount() {
return rtree.getBufferCache().getFileReferenceCount(rtree.getFileId());
}
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + ":" +
rtree.getFileReference().getRelativePath();
+ }
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/1921
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: I7044896559798426c04a3f46861bc5335b25d140
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: abdullah alamoudi <[email protected]>