>From Ali Alsuliman <ali.al.solai...@gmail.com>:

Ali Alsuliman has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561 )

Change subject: [ASTERIXDB-3587][RT] Re-use frame in NL join cache activity
......................................................................

[ASTERIXDB-3587][RT] Re-use frame in NL join cache activity

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
Instead of creating a frame for each incoming frame, create
one frame and re-use it in the cache activity of the nested
loop join operator.

- Reduce the logging level for some logs from debug to trace.

Ext-ref: MB-66027
Change-Id: I267d227f6a5e435e08221cb94895cedf60764e4c
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Ali Alsuliman <ali.al.solai...@gmail.com>
Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
---
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
M 
hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
5 files changed, 82 insertions(+), 55 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Ali Alsuliman: Looks good to me, but someone else must approve; Verified
  Jenkins: Verified

Objections:
  Anon. E. Moose #1000171: Violations found




diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index bedcd9c..e0e3cbc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -97,13 +97,15 @@
             try {
                 responseMsg = (ExecuteStatementResponseMessage) 
responseFuture.get(timeout, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
-                cancelQuery(ncMb, ncCtx.getNodeId(), 
requestReference.getUuid(), param.getClientContextID(), e, false);
+                cancelQuery(ncMb, ncCtx.getNodeId(), 
requestReference.getUuid(), param.getClientContextID(), e, false,
+                        "interrupt");
                 throw e;
             } catch (TimeoutException exception) {
                 RuntimeDataException hde = new 
RuntimeDataException(ErrorCode.REQUEST_TIMEOUT);
                 hde.addSuppressed(exception);
                 // cancel query
-                cancelQuery(ncMb, ncCtx.getNodeId(), 
requestReference.getUuid(), param.getClientContextID(), hde, true);
+                cancelQuery(ncMb, ncCtx.getNodeId(), 
requestReference.getUuid(), param.getClientContextID(), hde, true,
+                        "timeout");
                 throw hde;
             }
             executionState.end();
@@ -156,7 +158,7 @@
     }

     private void cancelQuery(INCMessageBroker messageBroker, String nodeId, 
String uuid, String clientContextID,
-            Exception exception, boolean wait) {
+            Exception exception, boolean wait, String reason) {
         if (uuid == null && clientContextID == null) {
             return;
         }
@@ -165,8 +167,7 @@
             CancelQueryRequest cancelQueryMessage =
                     new CancelQueryRequest(nodeId, 
cancelQueryFuture.getFutureId(), uuid, clientContextID);
             // TODO(mblow): multicc -- need to send cancellation to the 
correct cc
-            LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due 
to {}", uuid, clientContextID,
-                    exception.getClass().getSimpleName());
+            LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due 
to {}", uuid, clientContextID, reason);
             messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
             if (wait) {
                 
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
index b053cac..3f5de62 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java
@@ -161,8 +161,8 @@
         policy.close();
         frames.clear();
         numTuples = 0;
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("VariableTupleMemoryManager has reorganized " + 
statsReOrg + " times");
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("VariableTupleMemoryManager has reorganized {} times", 
statsReOrg);
         }
         statsReOrg = 0;
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 1e5c121..d9b473b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -131,9 +131,9 @@

         final int numPartitions = getNumOfPartitions(inputDataBytesSize / 
ctx.getInitialFrameSize(), memoryBudget);
         final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / 
numPartitions);
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("created hashtable, table size:" + tableSize + " file 
size:" + inputDataBytesSize
-                    + "  #partitions:" + numPartitions);
+        if (LOGGER.isTraceEnabled()) {
+            LOGGER.trace("created hashtable, table size:{} file size:{}  
#partitions:{}", tableSize, inputDataBytesSize,
+                    numPartitions);
         }

         final ArrayTupleBuilder outputTupleBuilder = new 
ArrayTupleBuilder(outRecordDescriptor.getFields().length);
@@ -185,9 +185,9 @@
                 if (force || 
hashTableForTuplePointer.isGarbageCollectionNeeded()) {
                     int numberOfFramesReclaimed =
                             
hashTableForTuplePointer.collectGarbage(bufferAccessor, tpcIntermediate);
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("Garbage Collection on Hash table is 
done. Deallocated frames:"
-                                + numberOfFramesReclaimed);
+                    if (LOGGER.isTraceEnabled()) {
+                        LOGGER.trace("Garbage Collection on Hash table is 
done. Deallocated frames:{}",
+                                numberOfFramesReclaimed);
                     }
                     return numberOfFramesReclaimed != -1;
                 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 52dc241..831753f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -21,6 +21,7 @@

 import java.nio.ByteBuffer;

+import org.apache.hyracks.api.comm.VSizeFrame;
 import org.apache.hyracks.api.context.IHyracksJobletContext;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.ActivityId;
@@ -116,17 +117,20 @@

             return new AbstractUnaryInputSinkOperatorNodePushable() {
                 private JoinCacheTaskState state;
+                private VSizeFrame inFrame;

                 @Override
                 public void open() throws HyracksDataException {
                     state = new JoinCacheTaskState(jobletCtx.getJobId(), new 
TaskId(getActivityId(), partition));
                     state.joiner = new NestedLoopJoin(jobletCtx, new 
FrameTupleAccessor(rd0),
                             new FrameTupleAccessor(rd1), memSize, isLeftOuter, 
nullWriters1);
+                    inFrame = new VSizeFrame(ctx);
                 }

                 @Override
                 public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
-                    ByteBuffer copyBuffer = 
jobletCtx.allocateFrame(buffer.capacity());
+                    inFrame.resize(buffer.capacity());
+                    ByteBuffer copyBuffer = inFrame.getBuffer();
                     FrameUtils.copyAndFlip(buffer, copyBuffer);
                     state.joiner.cache(copyBuffer);
                 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 91e94dc..e4a5c86 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -140,11 +140,6 @@
     private final boolean isLeftOuter;
     private final IMissingWriterFactory[] nonMatchWriterFactories;

-    //Flags added for test purpose
-    private boolean skipInMemoryHJ = false;
-    private boolean forceNLJ = false;
-    private boolean forceRoleReversal = false;
-
     private static final Logger LOGGER = LogManager.getLogger();

     public 
OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int 
memSizeInFrames,
@@ -233,9 +228,6 @@
         private int numOfPartitions;
         private OptimizedHybridHashJoin hybridHJ;

-        public BuildAndPartitionTaskState() {
-        }
-
         private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) {
             super(jobId, taskId);
         }
@@ -296,7 +288,7 @@
                 @Override
                 public void open() throws HyracksDataException {
                     if (memSizeInFrames <= 2) { //Dedicated buffers: One 
buffer to read and two buffers for output
-                        throw new HyracksDataException("Not enough memory is 
assigend for Hybrid Hash Join.");
+                        throw new HyracksDataException("Not enough memory is 
assigned for Hybrid Hash Join.");
                     }
                     state.memForJoin = memSizeInFrames - 2;
                     state.numOfPartitions =
@@ -308,8 +300,9 @@

                     state.hybridHJ.initBuild();
                     if (LOGGER.isTraceEnabled()) {
-                        LOGGER.trace("OptimizedHybridHashJoin is starting the 
build phase with " + state.numOfPartitions
-                                + " partitions using " + state.memForJoin + " 
frames for memory.");
+                        LOGGER.trace(
+                                "OptimizedHybridHashJoin is starting the build 
phase with {} partitions using {} frames for memory.",
+                                state.numOfPartitions, state.memForJoin);
                     }
                 }

@@ -513,11 +506,11 @@
                         stats.getLevel().set(level);
                     }

-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("\n>>>Joining Partition Pairs (thread_id 
" + Thread.currentThread().getId()
-                                + ") (pid " + ") - (level " + level + ")" + " 
- BuildSize:\t" + buildPartSize
-                                + "\tProbeSize:\t" + probePartSize + " - 
MemForJoin " + (state.memForJoin)
-                                + "  - LeftOuter is " + isLeftOuter);
+                    if (LOGGER.isTraceEnabled()) {
+                        LOGGER.trace(
+                                "\n>>>Joining Partition Pairs (thread_id {}) 
(pid ) - (level {}) - BuildSize:\t{}\tProbeSize:\t{} - MemForJoin {}  - 
LeftOuter is {}",
+                                Thread.currentThread().getId(), level, 
buildPartSize, probePartSize, state.memForJoin,
+                                isLeftOuter);
                     }
 
                     // Calculate the expected hash table size for the both 
side.
@@ -527,16 +520,17 @@
                             
SerializableHashTable.getExpectedTableFrameCount(probeSizeInTuple, frameSize);

                     //Apply in-Mem HJ if possible
-                    if (!skipInMemoryHJ && ((buildPartSize + 
expectedHashTableSizeForBuildInFrame < state.memForJoin)
+                    if (((buildPartSize + expectedHashTableSizeForBuildInFrame 
< state.memForJoin)
                             || (probePartSize + 
expectedHashTableSizeForProbeInFrame < state.memForJoin
                                     && !isLeftOuter))) {

                         int tabSize = -1;
-                        if (!forceRoleReversal && (isLeftOuter || 
(buildPartSize < probePartSize))) {
+                        if ((isLeftOuter || (buildPartSize < probePartSize))) {
                             //Case 1.1 - InMemHJ (without Role-Reversal)
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug("\t>>>Case 1.1 (IsLeftOuter || 
buildSize<probe) AND ApplyInMemHJ - [Level "
-                                        + level + "]");
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.trace(
+                                        "\t>>>Case 1.1 (IsLeftOuter || 
buildSize<probe) AND ApplyInMemHJ - [Level {}]",
+                                        level);
                             }
                             tabSize = buildSizeInTuple;
                             if (tabSize == 0) {
@@ -547,9 +541,10 @@
                             applyInMemHashJoin(buildKeys, probeKeys, tabSize, 
buildRd, probeRd, buildHpc, probeHpc,
                                     buildSideReader, probeSideReader, 
probComp); // checked-confirmed
                         } else { //Case 1.2 - InMemHJ with Role Reversal
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug("\t>>>Case 1.2. (NoIsLeftOuter || 
probe<build) AND ApplyInMemHJ"
-                                        + "WITH RoleReversal - [Level " + 
level + "]");
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.trace(
+                                        "\t>>>Case 1.2. (NoIsLeftOuter || 
probe<build) AND ApplyInMemHJWITH RoleReversal - [Level {}]",
+                                        level);
                             }
                             tabSize = probeSizeInTuple;
                             if (tabSize == 0) {
@@ -563,24 +558,23 @@
                     }
                     //Apply (Recursive) HHJ
                     else {
-                        if (LOGGER.isDebugEnabled()) {
-                            LOGGER.debug("\t>>>Case 2. ApplyRecursiveHHJ - 
[Level " + level + "]");
+                        if (LOGGER.isTraceEnabled()) {
+                            LOGGER.trace("\t>>>Case 2. ApplyRecursiveHHJ - 
[Level {}]", level);
                         }
-                        if (!forceRoleReversal && (isLeftOuter || 
buildPartSize < probePartSize)) {
+                        if ((isLeftOuter || buildPartSize < probePartSize)) {
                             //Case 2.1 - Recursive HHJ (without Role-Reversal)
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug(
-                                        "\t\t>>>Case 2.1 - RecursiveHHJ WITH 
(isLeftOuter || build<probe) - [Level "
-                                                + level + "]");
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.trace(
+                                        "\t\t>>>Case 2.1 - RecursiveHHJ WITH 
(isLeftOuter || build<probe) - [Level {}]",
+                                        level);
                             }
                             applyHybridHashJoin((int) buildPartSize, 
PROBE_REL, BUILD_REL, probeKeys, buildKeys,
                                     probeRd, buildRd, probeHpc, buildHpc, 
probeSideReader, buildSideReader, level,
                                     beforeMax, probComp);

                         } else { //Case 2.2 - Recursive HHJ (with 
Role-Reversal)
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug(
-                                        "\t\t>>>Case 2.2. - RecursiveHHJ WITH 
RoleReversal - [Level " + level + "]");
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.trace("\t\t>>>Case 2.2. - RecursiveHHJ 
WITH RoleReversal - [Level {}]", level);
                             }

                             applyHybridHashJoin((int) probePartSize, 
BUILD_REL, PROBE_REL, buildKeys, probeKeys,
@@ -645,11 +639,12 @@
                         int afterMax = Math.max(maxAfterBuildSize, 
maxAfterProbeSize);

                         BitSet rPStatus = rHHj.getPartitionStatus();
-                        if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * 
beforeMax))) {
+                        if ((afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) {
                             //Case 2.1.1 - Keep applying HHJ
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug("\t\t>>>Case 2.1.1 - KEEP 
APPLYING RecursiveHHJ WITH "
-                                        + "(isLeftOuter || build<probe) - 
[Level " + level + "]");
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.trace(
+                                        "\t\t>>>Case 2.1.1 - KEEP APPLYING 
RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
+                                        level);
                             }
                             for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; 
rPid = rPStatus.nextSetBit(rPid + 1)) {
                                 RunFileReader rbrfw = 
rHHj.getBuildRFReader(rPid);
@@ -679,9 +674,10 @@
                             }

                         } else { //Case 2.1.2 - Switch to NLJ
-                            if (LOGGER.isDebugEnabled()) {
-                                LOGGER.debug("\t\t>>>Case 2.1.2 - SWITCHED to 
NLJ RecursiveHHJ WITH "
-                                        + "(isLeftOuter || build<probe) - 
[Level " + level + "]");
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.trace(
+                                        "\t\t>>>Case 2.1.2 - SWITCHED to NLJ 
RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]",
+                                        level);
                             }
                             for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; 
rPid = rPStatus.nextSetBit(rPid + 1)) {
                                 RunFileReader rbrfw = 
rHHj.getBuildRFReader(rPid);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: ionic
Gerrit-Change-Id: I267d227f6a5e435e08221cb94895cedf60764e4c
Gerrit-Change-Number: 19561
Gerrit-PatchSet: 3
Gerrit-Owner: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-MessageType: merged

Reply via email to