>From Ali Alsuliman <ali.al.solai...@gmail.com>: Ali Alsuliman has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561 )
Change subject: [ASTERIXDB-3587][RT] Re-use frame in NL join cache activity ...................................................................... [ASTERIXDB-3587][RT] Re-use frame in NL join cache activity - user model changes: no - storage format changes: no - interface changes: no Details: Instead of creating a frame for each incoming frame, create one frame and re-use it in the cache activity of the nested loop join operator. - Reduce the logging level for some logs from debug to trace. Ext-ref: MB-66027 Change-Id: I267d227f6a5e435e08221cb94895cedf60764e4c Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Ali Alsuliman <ali.al.solai...@gmail.com> Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com> Reviewed-by: Murtadha Hubail <mhub...@apache.org> --- M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java M hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java 5 files changed, 82 insertions(+), 55 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Ali Alsuliman: Looks good to me, but someone else must approve; Verified Jenkins: Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index bedcd9c..e0e3cbc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -97,13 +97,15 @@ try { responseMsg = (ExecuteStatementResponseMessage) responseFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - cancelQuery(ncMb, ncCtx.getNodeId(), requestReference.getUuid(), param.getClientContextID(), e, false); + cancelQuery(ncMb, ncCtx.getNodeId(), requestReference.getUuid(), param.getClientContextID(), e, false, + "interrupt"); throw e; } catch (TimeoutException exception) { RuntimeDataException hde = new RuntimeDataException(ErrorCode.REQUEST_TIMEOUT); hde.addSuppressed(exception); // cancel query - cancelQuery(ncMb, ncCtx.getNodeId(), requestReference.getUuid(), param.getClientContextID(), hde, true); + cancelQuery(ncMb, ncCtx.getNodeId(), requestReference.getUuid(), param.getClientContextID(), hde, true, + "timeout"); throw hde; } executionState.end(); @@ -156,7 +158,7 @@ } private void cancelQuery(INCMessageBroker messageBroker, String nodeId, String uuid, String clientContextID, - Exception exception, boolean wait) { + Exception exception, boolean wait, String reason) { if (uuid == null && clientContextID == null) { return; } @@ -165,8 +167,7 @@ CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), uuid, clientContextID); // TODO(mblow): multicc -- need to send cancellation to the correct cc - LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due to {}", uuid, clientContextID, - exception.getClass().getSimpleName()); + LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due to {}", uuid, clientContextID, reason); messageBroker.sendMessageToPrimaryCC(cancelQueryMessage); if (wait) { cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS, diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java index b053cac..3f5de62 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/buffermanager/VariableDeletableTupleMemoryManager.java @@ -161,8 +161,8 @@ policy.close(); frames.clear(); numTuples = 0; - if (LOG.isDebugEnabled()) { - LOG.debug("VariableTupleMemoryManager has reorganized " + statsReOrg + " times"); + if (LOG.isTraceEnabled()) { + LOG.trace("VariableTupleMemoryManager has reorganized {} times", statsReOrg); } statsReOrg = 0; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java index 1e5c121..d9b473b 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java @@ -131,9 +131,9 @@ final int numPartitions = getNumOfPartitions(inputDataBytesSize / ctx.getInitialFrameSize(), memoryBudget); final int entriesPerPartition = (int) Math.ceil(1.0 * tableSize / numPartitions); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("created hashtable, table size:" + tableSize + " file size:" + inputDataBytesSize - + " #partitions:" + numPartitions); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("created hashtable, table size:{} file size:{} #partitions:{}", tableSize, inputDataBytesSize, + numPartitions); } final ArrayTupleBuilder outputTupleBuilder = new ArrayTupleBuilder(outRecordDescriptor.getFields().length); @@ -185,9 +185,9 @@ if (force || hashTableForTuplePointer.isGarbageCollectionNeeded()) { int numberOfFramesReclaimed = hashTableForTuplePointer.collectGarbage(bufferAccessor, tpcIntermediate); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Garbage Collection on Hash table is done. Deallocated frames:" - + numberOfFramesReclaimed); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Garbage Collection on Hash table is done. Deallocated frames:{}", + numberOfFramesReclaimed); } return numberOfFramesReclaimed != -1; } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java index 52dc241..831753f 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; +import org.apache.hyracks.api.comm.VSizeFrame; import org.apache.hyracks.api.context.IHyracksJobletContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.ActivityId; @@ -116,17 +117,20 @@ return new AbstractUnaryInputSinkOperatorNodePushable() { private JoinCacheTaskState state; + private VSizeFrame inFrame; @Override public void open() throws HyracksDataException { state = new JoinCacheTaskState(jobletCtx.getJobId(), new TaskId(getActivityId(), partition)); state.joiner = new NestedLoopJoin(jobletCtx, new FrameTupleAccessor(rd0), new FrameTupleAccessor(rd1), memSize, isLeftOuter, nullWriters1); + inFrame = new VSizeFrame(ctx); } @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - ByteBuffer copyBuffer = jobletCtx.allocateFrame(buffer.capacity()); + inFrame.resize(buffer.capacity()); + ByteBuffer copyBuffer = inFrame.getBuffer(); FrameUtils.copyAndFlip(buffer, copyBuffer); state.joiner.cache(copyBuffer); } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 91e94dc..e4a5c86 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -140,11 +140,6 @@ private final boolean isLeftOuter; private final IMissingWriterFactory[] nonMatchWriterFactories; - //Flags added for test purpose - private boolean skipInMemoryHJ = false; - private boolean forceNLJ = false; - private boolean forceRoleReversal = false; - private static final Logger LOGGER = LogManager.getLogger(); public OptimizedHybridHashJoinOperatorDescriptor(IOperatorDescriptorRegistry spec, int memSizeInFrames, @@ -233,9 +228,6 @@ private int numOfPartitions; private OptimizedHybridHashJoin hybridHJ; - public BuildAndPartitionTaskState() { - } - private BuildAndPartitionTaskState(JobId jobId, TaskId taskId) { super(jobId, taskId); } @@ -296,7 +288,7 @@ @Override public void open() throws HyracksDataException { if (memSizeInFrames <= 2) { //Dedicated buffers: One buffer to read and two buffers for output - throw new HyracksDataException("Not enough memory is assigend for Hybrid Hash Join."); + throw new HyracksDataException("Not enough memory is assigned for Hybrid Hash Join."); } state.memForJoin = memSizeInFrames - 2; state.numOfPartitions = @@ -308,8 +300,9 @@ state.hybridHJ.initBuild(); if (LOGGER.isTraceEnabled()) { - LOGGER.trace("OptimizedHybridHashJoin is starting the build phase with " + state.numOfPartitions - + " partitions using " + state.memForJoin + " frames for memory."); + LOGGER.trace( + "OptimizedHybridHashJoin is starting the build phase with {} partitions using {} frames for memory.", + state.numOfPartitions, state.memForJoin); } } @@ -513,11 +506,11 @@ stats.getLevel().set(level); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("\n>>>Joining Partition Pairs (thread_id " + Thread.currentThread().getId() - + ") (pid " + ") - (level " + level + ")" + " - BuildSize:\t" + buildPartSize - + "\tProbeSize:\t" + probePartSize + " - MemForJoin " + (state.memForJoin) - + " - LeftOuter is " + isLeftOuter); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "\n>>>Joining Partition Pairs (thread_id {}) (pid ) - (level {}) - BuildSize:\t{}\tProbeSize:\t{} - MemForJoin {} - LeftOuter is {}", + Thread.currentThread().getId(), level, buildPartSize, probePartSize, state.memForJoin, + isLeftOuter); } // Calculate the expected hash table size for the both side. @@ -527,16 +520,17 @@ SerializableHashTable.getExpectedTableFrameCount(probeSizeInTuple, frameSize); //Apply in-Mem HJ if possible - if (!skipInMemoryHJ && ((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin) + if (((buildPartSize + expectedHashTableSizeForBuildInFrame < state.memForJoin) || (probePartSize + expectedHashTableSizeForProbeInFrame < state.memForJoin && !isLeftOuter))) { int tabSize = -1; - if (!forceRoleReversal && (isLeftOuter || (buildPartSize < probePartSize))) { + if ((isLeftOuter || (buildPartSize < probePartSize))) { //Case 1.1 - InMemHJ (without Role-Reversal) - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level " - + level + "]"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "\t>>>Case 1.1 (IsLeftOuter || buildSize<probe) AND ApplyInMemHJ - [Level {}]", + level); } tabSize = buildSizeInTuple; if (tabSize == 0) { @@ -547,9 +541,10 @@ applyInMemHashJoin(buildKeys, probeKeys, tabSize, buildRd, probeRd, buildHpc, probeHpc, buildSideReader, probeSideReader, probComp); // checked-confirmed } else { //Case 1.2 - InMemHJ with Role Reversal - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJ" - + "WITH RoleReversal - [Level " + level + "]"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "\t>>>Case 1.2. (NoIsLeftOuter || probe<build) AND ApplyInMemHJWITH RoleReversal - [Level {}]", + level); } tabSize = probeSizeInTuple; if (tabSize == 0) { @@ -563,24 +558,23 @@ } //Apply (Recursive) HHJ else { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("\t>>>Case 2. ApplyRecursiveHHJ - [Level " + level + "]"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("\t>>>Case 2. ApplyRecursiveHHJ - [Level {}]", level); } - if (!forceRoleReversal && (isLeftOuter || buildPartSize < probePartSize)) { + if ((isLeftOuter || buildPartSize < probePartSize)) { //Case 2.1 - Recursive HHJ (without Role-Reversal) - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level " - + level + "]"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "\t\t>>>Case 2.1 - RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]", + level); } applyHybridHashJoin((int) buildPartSize, PROBE_REL, BUILD_REL, probeKeys, buildKeys, probeRd, buildRd, probeHpc, buildHpc, probeSideReader, buildSideReader, level, beforeMax, probComp); } else { //Case 2.2 - Recursive HHJ (with Role-Reversal) - if (LOGGER.isDebugEnabled()) { - LOGGER.debug( - "\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level " + level + "]"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("\t\t>>>Case 2.2. - RecursiveHHJ WITH RoleReversal - [Level {}]", level); } applyHybridHashJoin((int) probePartSize, BUILD_REL, PROBE_REL, buildKeys, probeKeys, @@ -645,11 +639,12 @@ int afterMax = Math.max(maxAfterBuildSize, maxAfterProbeSize); BitSet rPStatus = rHHj.getPartitionStatus(); - if (!forceNLJ && (afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { + if ((afterMax < (NLJ_SWITCH_THRESHOLD * beforeMax))) { //Case 2.1.1 - Keep applying HHJ - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH " - + "(isLeftOuter || build<probe) - [Level " + level + "]"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "\t\t>>>Case 2.1.1 - KEEP APPLYING RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]", + level); } for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); @@ -679,9 +674,10 @@ } } else { //Case 2.1.2 - Switch to NLJ - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH " - + "(isLeftOuter || build<probe) - [Level " + level + "]"); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "\t\t>>>Case 2.1.2 - SWITCHED to NLJ RecursiveHHJ WITH (isLeftOuter || build<probe) - [Level {}]", + level); } for (int rPid = rPStatus.nextSetBit(0); rPid >= 0; rPid = rPStatus.nextSetBit(rPid + 1)) { RunFileReader rbrfw = rHHj.getBuildRFReader(rPid); -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19561 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: ionic Gerrit-Change-Id: I267d227f6a5e435e08221cb94895cedf60764e4c Gerrit-Change-Number: 19561 Gerrit-PatchSet: 3 Gerrit-Owner: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-MessageType: merged