>From Ali Alsuliman <[email protected]>:
Ali Alsuliman has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18318 )
Change subject: [NO ISSUE][OTH] Logging enhancements + query job logging
......................................................................
[NO ISSUE][OTH] Logging enhancements + query job logging
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
- Add context to cancel request/response.
- Reduce some JobWork to TRACE.
- Avoid NPE when closing the pipeline.
- Add method to get Job Queue size.
- Add method to IJobCapacityController to get cluster
current capacity for logging.
- Remove not useful logs.
Change-Id: Iec2c7af1b14bf99b1d11b4178be66da860dfcbf4
---
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
M
hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
M
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
M
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
M
hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
M
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
M
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
21 files changed, 140 insertions(+), 63 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/18/18318/1
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 626b938..d6727dd 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -582,12 +582,12 @@
Future<Void> suspendTask;
synchronized (this) {
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "suspending entity " + entityId);
- LOGGER.log(level, "Waiting for ongoing activities");
+ LOGGER.log(level, "{} Suspending entity {}", jobId, entityId);
+ LOGGER.log(level, "{} Waiting for ongoing activities", jobId);
}
waitForNonTransitionState();
if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Proceeding with suspension. Current state
is " + state);
+ LOGGER.log(level, "{} Proceeding with suspension. Current
state is {}", jobId, state);
}
if (state == ActivityState.STOPPED) {
suspended = true;
@@ -609,12 +609,12 @@
doSuspend(metadataProvider);
return null;
});
- LOGGER.log(level, "Suspension task has been submitted");
+ LOGGER.log(level, "{} Suspension task has been submitted", jobId);
}
try {
- LOGGER.log(level, "Waiting for suspension task to complete");
+ LOGGER.log(level, "{} Waiting for suspension task to complete",
jobId);
suspendTask.get();
- LOGGER.log(level, "waiting for state to become SUSPENDED or
TEMPORARILY_FAILED");
+ LOGGER.log(level, "{} Waiting for state to become SUSPENDED or
TEMPORARILY_FAILED", jobId);
subscriber.sync();
suspended = true;
} catch (Exception e) {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 3c277d5..1f31698 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -92,16 +92,14 @@
@Override
public void notifyJobCreation(JobId jobId, JobSpecification
jobSpecification) throws HyracksDataException {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "notifyJobCreation was called for job {}",
jobId);
- }
Object property =
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
if (!(property instanceof EntityId)) {
- if (LOGGER.isEnabled(level)) {
- LOGGER.log(level, "Job {} is not of type active job. property
found to be {}", jobId, property);
+ if (property != null) {
+ LOGGER.debug("{} is not an active job. job property={}",
jobId, property);
}
return;
}
+ LOGGER.debug("notified of ingestion job creation {}", jobId);
EntityId entityId = (EntityId) property;
monitorJob(jobId, entityId);
boolean found = jobId2EntityId.get(jobId) != null;
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 65d1039..04e661b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -82,4 +82,9 @@
}
}
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s, uuid=%s, contextId=%s, node=%s)",
getClass().getSimpleName(), reqId, uuid,
+ contextId, nodeId);
+ }
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
index d65ae31..a711b73 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -49,4 +49,8 @@
return status;
}
+ @Override
+ public String toString() {
+ return String.format("%s(id=%s, status=%s)",
getClass().getSimpleName(), reqId, status);
+ }
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
index ac87f7e..ef348fe 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -64,16 +64,16 @@
final long time =
ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
try {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO,
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.log(Level.TRACE,
ctx.getTaskContext().getTaskAttemptId() + " sleeping for " + time + " ms");
}
Thread.sleep(time);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.log(Level.INFO,
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.log(Level.TRACE,
ctx.getTaskContext().getTaskAttemptId() + " done sleeping for " + time + " ms");
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index b123a5e..ae903d1 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -77,6 +77,11 @@
ensureMaxCapacity();
}
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ return resourceManager.getCurrentCapacity();
+ }
+
private void ensureMaxCapacity() {
final IClusterCapacity currentCapacity =
resourceManager.getCurrentCapacity();
final IReadOnlyClusterCapacity maximumCapacity =
resourceManager.getMaximumCapacity();
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 9f4541f5..0c74260 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -74,7 +74,7 @@
}
protected void flushIfNotFailed() throws HyracksDataException {
- if (!failed && appender.getTupleCount() > 0) {
+ if (!failed && appender != null && appender.getTupleCount() > 0) {
flushAndReset();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index 9e38a20..b18bcb1 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -24,6 +24,11 @@
public class DefaultJobCapacityController implements IJobCapacityController {
public static final DefaultJobCapacityController INSTANCE = new
DefaultJobCapacityController();
+ private static final IClusterCapacity CAPACITY = new ClusterCapacity();
+ static {
+ CAPACITY.setAggregatedCores(Integer.MAX_VALUE);
+ CAPACITY.setAggregatedMemoryByteSize(Long.MAX_VALUE);
+ }
private DefaultJobCapacityController() {
}
@@ -37,4 +42,9 @@
public void release(JobSpecification job) {
// No operation here.
}
+
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ return CAPACITY;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index 5fa4bd9..f88baa2 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -57,4 +57,10 @@
*/
void release(JobSpecification job);
+ /**
+ * The cluster current capacity.
+ *
+ * @return the cluster current capacity.
+ */
+ IReadOnlyClusterCapacity getClusterCapacity();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 33bdd05..e2390f1 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -38,7 +38,9 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
@@ -312,6 +314,7 @@
run.setStartTime(System.currentTimeMillis());
run.setStartTimeZoneId(ZoneId.systemDefault().getId());
JobId jobId = run.getJobId();
+ logJobCapacity(run, "running");
activeRunMap.put(jobId, run);
run.setStatus(JobStatus.RUNNING, null);
executeJobInternal(run);
@@ -319,6 +322,7 @@
// Queue a job when the required capacity for the job is not met.
private void queueJob(JobRun jobRun) throws HyracksException {
+ logJobCapacity(jobRun, "queueing");
jobRun.setStatus(JobStatus.PENDING, null);
jobQueue.add(jobRun);
}
@@ -354,5 +358,20 @@
private void releaseJobCapacity(JobRun jobRun) {
final JobSpecification job = jobRun.getJobSpecification();
jobCapacityController.release(job);
+ logJobCapacity(jobRun, "released");
+ }
+
+ private void logJobCapacity(JobRun jobRun, String jobStateDesc) {
+ IClusterCapacity requiredResources =
jobRun.getJobSpecification().getRequiredClusterCapacity();
+ long requiredMemory = requiredResources.getAggregatedMemoryByteSize();
+ int requiredCPUs = requiredResources.getAggregatedCores();
+ if (requiredMemory == 0 && requiredCPUs == 0) {
+ return;
+ }
+ IReadOnlyClusterCapacity clusterCapacity =
jobCapacityController.getClusterCapacity();
+ LOGGER.info("{} {}, memory={}, cpu={}, (new) cluster memory={},
cpu={}, currently running={}, queued={}",
+ jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs,
+ clusterCapacity.getAggregatedMemoryByteSize(),
clusterCapacity.getAggregatedCores(),
+ getRunningJobsCount(), jobQueue.size());
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 260c6b9..38277c2 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -121,4 +121,9 @@
public void clear() {
jobListMap.clear();
}
+
+ @Override
+ public int size() {
+ return jobListMap.size();
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
index be40883..1f2c29a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -73,4 +73,11 @@
* Clears the job queue
*/
void clear();
+
+ /**
+ * Returns the number of queued jobs.
+ *
+ * @return the number of queued jobs.
+ */
+ int size();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 771832e..4f77914 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -25,6 +25,7 @@
import org.apache.hyracks.api.messages.IMessage;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -34,8 +35,8 @@
public class ApplicationMessageWork extends AbstractHeartbeatWork {
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
public ApplicationMessageWork(ClusterControllerService ccs, byte[]
message, DeploymentId deploymentId,
String nodeId) {
@@ -61,6 +62,11 @@
return getName() + ": nodeID: " + nodeId;
}
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
+
private static void notifyMessageBroker(ICCServiceContext ctx, IMessage
msg, String nodeId) {
final ExecutorService executor =
ctx.getControllerService().getExecutor();
executor.execute(() -> {
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index c36b887..73c0c7d 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -24,10 +24,11 @@
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.common.work.AbstractWork;
import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
public class GetNodeControllersInfoWork extends AbstractWork {
private final INodeManager nodeManager;
- private IResultCallback<Map<String, NodeControllerInfo>> callback;
+ private final IResultCallback<Map<String, NodeControllerInfo>> callback;
public GetNodeControllersInfoWork(INodeManager nodeManager,
IResultCallback<Map<String, NodeControllerInfo>> callback) {
@@ -39,4 +40,9 @@
public void run() {
callback.setValue(nodeManager.getNodeControllerInfoMap());
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index ee10669..cbdf98a 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -71,6 +71,6 @@
@Override
public Level logLevel() {
- return Level.DEBUG;
+ return Level.TRACE;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index eee8950..e52e3ac 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -52,7 +52,6 @@
private boolean failed;
protected boolean flushRequest;
private boolean deallocated;
- private Level openCloseLevel = Level.DEBUG;
private Thread dataConsumerThread;
public MaterializingPipelinedPartition(IHyracksTaskContext ctx,
PartitionManager manager, PartitionId pid,
@@ -181,9 +180,6 @@
@Override
public void open() throws HyracksDataException {
- if (LOGGER.isEnabled(openCloseLevel)) {
- LOGGER.log(openCloseLevel, "open(" + pid + " by " + taId);
- }
size = 0;
eos = false;
failed = false;
@@ -215,9 +211,6 @@
@Override
public void close() throws HyracksDataException {
- if (LOGGER.isEnabled(openCloseLevel)) {
- LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId);
- }
if (writeHandle != null) {
ctx.getIoManager().close(writeHandle);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 6d4f173..d1360d8 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -30,10 +30,10 @@
public class ApplicationMessageWork extends AbstractWork {
private static final Logger LOGGER = LogManager.getLogger();
- private byte[] message;
- private DeploymentId deploymentId;
- private String nodeId;
- private NodeControllerService ncs;
+ private final byte[] message;
+ private final DeploymentId deploymentId;
+ private final String nodeId;
+ private final NodeControllerService ncs;
public ApplicationMessageWork(NodeControllerService ncs, byte[] message,
DeploymentId deploymentId, String nodeId) {
this.ncs = ncs;
@@ -62,4 +62,9 @@
public String toString() {
return getName() + ": nodeId: " + nodeId;
}
+
+ @Override
+ public Level logLevel() {
+ return Level.TRACE;
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index 6dd4307..93e60bc 100644
---
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -65,6 +65,7 @@
@Override
public String toString() {
- return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId +
"]";
+ return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId +
"]"
+ + ((exceptions != null && !exceptions.isEmpty()) ? " " +
exceptions.get(0).getMessage() : "");
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 555e8fb..0f31491 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -319,9 +319,6 @@
if (!failed) {
state.hybridHJ.closeBuild();
ctx.setStateObject(state);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("OptimizedHybridHashJoin closed
its build phase");
- }
} else {
state.hybridHJ.clearBuildTempFiles();
}
@@ -402,10 +399,6 @@
writer.open();
state.hybridHJ.initProbe(probComp);
-
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("OptimizedHybridHashJoin is starting the
probe phase.");
- }
}
@Override
@@ -416,7 +409,7 @@
@Override
public void fail() throws HyracksDataException {
failed = true;
- if (state.hybridHJ != null) {
+ if (state != null && state.hybridHJ != null) {
state.hybridHJ.fail();
}
writer.fail();
@@ -427,12 +420,13 @@
if (failed) {
try {
// Clear temp files if fail() was called.
- state.hybridHJ.clearBuildTempFiles();
- state.hybridHJ.clearProbeTempFiles();
+ if (state != null && state.hybridHJ != null) {
+ state.hybridHJ.clearBuildTempFiles();
+ state.hybridHJ.clearProbeTempFiles();
+ }
} finally {
writer.close(); // writer should always be closed.
}
- logProbeComplete();
return;
}
try {
@@ -477,17 +471,7 @@
// Re-throw the whatever is caught.
throw e;
} finally {
- try {
- logProbeComplete();
- } finally {
- writer.close();
- }
- }
- }
-
- private void logProbeComplete() {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("OptimizedHybridHashJoin closed its probe
phase");
+ writer.close();
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index 08f15b3..a1704ec 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -41,13 +41,9 @@
import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory;
import org.apache.hyracks.dataflow.std.structures.MaxHeap;
import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
public class TupleSorterHeapSort implements ITupleSorter {
- private static final Logger LOGGER = LogManager.getLogger();
-
class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> {
@Override
public IResetableComparable<HeapEntry> createResetableComparable() {
@@ -288,7 +284,6 @@
int maxFrameSize = outputFrame.getFrameSize();
int numEntries = heap.getNumEntries();
IResetableComparable[] entries = heap.getEntries();
- int io = 0;
for (int i = 0; i < numEntries; i++) {
HeapEntry minEntry = (HeapEntry) entries[i];
bufferAccessor1.reset(minEntry.tuplePointer);
@@ -296,14 +291,10 @@
bufferAccessor1.getTupleStartOffset(),
bufferAccessor1.getTupleLength());
if (flushed > 0) {
maxFrameSize = Math.max(maxFrameSize, flushed);
- io++;
}
}
maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
outputAppender.write(writer, true);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Flushed records:" + numEntries + "; Flushed through "
+ (io + 1) + " frames");
- }
return maxFrameSize;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index be22b9c..7a75a0f 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -34,7 +34,9 @@
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
import org.apache.hyracks.api.result.IResultSet;
import org.apache.hyracks.api.result.IResultSetReader;
import org.apache.hyracks.client.result.ResultSet;
@@ -254,6 +256,14 @@
public void release(JobSpecification job) {
}
+
+ @Override
+ public IReadOnlyClusterCapacity getClusterCapacity() {
+ ClusterCapacity clusterCapacity = new ClusterCapacity();
+ clusterCapacity.setAggregatedMemoryByteSize(maxRAM);
+ clusterCapacity.setAggregatedCores(Integer.MAX_VALUE);
+ return clusterCapacity;
+ }
};
}
}
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18318
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: neo
Gerrit-Change-Id: Iec2c7af1b14bf99b1d11b4178be66da860dfcbf4
Gerrit-Change-Number: 18318
Gerrit-PatchSet: 1
Gerrit-Owner: Ali Alsuliman <[email protected]>
Gerrit-MessageType: newchange