[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r221491365 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -90,27 +95,30 @@ public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) { int capacity = src.readableBytes(); DrillBuf duplicateOne = bufferAllocator.buffer(capacity); int readerIndex = src.readerIndex(); - src.readBytes(duplicateOne, 0, capacity); + duplicateOne.writeBytes(src); src.readerIndex(readerIndex); cloned[i] = duplicateOne; i++; } return new RuntimeFilterWritable(runtimeFilterBDef, cloned); } - public boolean same(RuntimeFilterWritable other) { -BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef(); -int otherMajorId = runtimeFilterDef.getMajorFragmentId(); -int otherMinorId = runtimeFilterDef.getMinorFragmentId(); -int otherHashJoinOpId = runtimeFilterDef.getHjOpId(); -int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId(); -int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId(); -int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId(); -return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId; + public String toString() { +return identifier; } - public String toString() { -return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId(); + @Override + public boolean equals(Object other) { Review comment: Please include null check for other like: ``` if (other == null) { return false; } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r221490803 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -223,16 +223,16 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder holder) throws RuntimeExcept right.accept(this, holder); boolean routeToForeman = holder.needToRouteToForeman(); if (!routeToForeman) { -runtimeFilterDef.setSendToForeman(false); +runtimeFilterDef.setSendToForeman(!routeToForeman); Review comment: this should be `runtimeFilterDef.setSendToForeman(routeToForeman);` and correspondingly change for else condition. Also you don't need `if-else` block any more. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r221490851 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -223,16 +223,16 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder holder) throws RuntimeExcept right.accept(this, holder); boolean routeToForeman = holder.needToRouteToForeman(); if (!routeToForeman) { -runtimeFilterDef.setSendToForeman(false); +runtimeFilterDef.setSendToForeman(!routeToForeman); } else { -runtimeFilterDef.setSendToForeman(true); +runtimeFilterDef.setSendToForeman(routeToForeman); } List bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { if (!routeToForeman) { - bloomFilterDef.setLocal(true); + bloomFilterDef.setLocal(!routeToForeman); } else { - bloomFilterDef.setLocal(false); + bloomFilterDef.setLocal(routeToForeman); Review comment: Same here you can remove `if-else` block This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r221329807 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -36,25 +40,63 @@ private RuntimeFilterWritable aggregated = null; - private Queue rfQueue = new ConcurrentLinkedQueue<>(); + private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); private AtomicBoolean running = new AtomicBoolean(true); + private ReentrantLock aggregatedRFLock = new ReentrantLock(); + + private Thread asyncAggregateThread; + + private BufferAllocator bufferAllocator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); + + + public RuntimeFilterSink(BufferAllocator bufferAllocator) { +this.bufferAllocator = bufferAllocator; +AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); +asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); +asyncAggregateThread.start(); + } + public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -rfQueue.add(runtimeFilterWritable); -if (currentBookId.get() == 0) { - AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); - Thread asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); - asyncAggregateThread.start(); +if (running.get()) { + if (containOne()) { +boolean same = aggregated.same(runtimeFilterWritable); +if (!same) { + //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs + //share the same FragmentContext. Review comment: Yes I did consider the right deep tree case too and the logic looks fine to me. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r221069294 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -36,25 +40,63 @@ private RuntimeFilterWritable aggregated = null; - private Queue rfQueue = new ConcurrentLinkedQueue<>(); + private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); private AtomicBoolean running = new AtomicBoolean(true); + private ReentrantLock aggregatedRFLock = new ReentrantLock(); + + private Thread asyncAggregateThread; + + private BufferAllocator bufferAllocator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); + + + public RuntimeFilterSink(BufferAllocator bufferAllocator) { +this.bufferAllocator = bufferAllocator; +AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); +asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); +asyncAggregateThread.start(); + } + public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -rfQueue.add(runtimeFilterWritable); -if (currentBookId.get() == 0) { - AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); - Thread asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); - asyncAggregateThread.start(); +if (running.get()) { + if (containOne()) { +boolean same = aggregated.same(runtimeFilterWritable); +if (!same) { + //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs + //share the same FragmentContext. Review comment: Thanks for explanation. So for single minor fragment left deep tree case as shown below there will be only one `RuntimeFilterOperator` (RTF) inserted above left most Scan. Now when `next() `is called then first upper HJ build side will be evaluated which will create BloomFilter and send to RTF operator. Later `next()` will be called on probe side of first upper HJ which will again result in calling `next() `on build side of lower hash join. When build side of lower join is completed then it will again send its BloomFilter to RTF operator. Considering bloom filter received is from 2 different HJ operators (hence different `srcHashJoinOpId`) it will discard the first one and keep the second one. I guess this is done because join condition column can be different in different HashJoin ? ``` HJ / \ HJScan / \ Scan Scan ``` ``` HJ / \ HJScan / \ RTF Scan | Scan ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r220256674 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -36,25 +40,63 @@ private RuntimeFilterWritable aggregated = null; - private Queue rfQueue = new ConcurrentLinkedQueue<>(); + private BlockingQueue rfQueue = new LinkedBlockingQueue<>(); private AtomicBoolean running = new AtomicBoolean(true); + private ReentrantLock aggregatedRFLock = new ReentrantLock(); + + private Thread asyncAggregateThread; + + private BufferAllocator bufferAllocator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); + + + public RuntimeFilterSink(BufferAllocator bufferAllocator) { +this.bufferAllocator = bufferAllocator; +AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); +asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); +asyncAggregateThread.start(); + } + public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { -rfQueue.add(runtimeFilterWritable); -if (currentBookId.get() == 0) { - AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); - Thread asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); - asyncAggregateThread.start(); +if (running.get()) { + if (containOne()) { +boolean same = aggregated.same(runtimeFilterWritable); +if (!same) { + //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs + //share the same FragmentContext. Review comment: Can you please elaborate on this use case ? I didn't quite get it. Based on my understanding for one fragment case the RuntimeFilter will be directly set in the FragmentContext and will not be sent over wire. So we should not have received it in first place ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r220710747 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -89,16 +142,27 @@ public void close() throws Exception { @Override public void run() { - while (running.get()) { -RuntimeFilterWritable toAggregate = rfQueue.poll(); -if (toAggregate != null) { - if (aggregated != null) { -aggregated.aggregate(toAggregate); -currentBookId.incrementAndGet(); + try { +while (running.get()) { + RuntimeFilterWritable toAggregate = rfQueue.take(); + if (!running.get()) { +toAggregate.close(); +return; + } + if (containOne()) { +try { + aggregatedRFLock.lock(); + aggregated.aggregate(toAggregate); Review comment: `aggregated.close()` should be called to release Drillbuff reference it's holding to since `aggregate()` will not do that. This change is part of branch shared in above comment. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r219931981 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -224,15 +221,15 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder holder) throws RuntimeExcept Prel right = (Prel) hashJoinPrel.getRight(); holder.setFromBuildSide(true); right.accept(this, holder); - boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange(); - if (buildSideEncountererdBroadcastExchange) { + boolean routeToForeman = holder.needToRouteToForeman(); + if (!routeToForeman) { runtimeFilterDef.setSendToForeman(false); } else { runtimeFilterDef.setSendToForeman(true); Review comment: You can change above as: `runtimeFilterDef.setSendToForeman(routeToForeman);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r220710086 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -89,16 +142,27 @@ public void close() throws Exception { @Override public void run() { - while (running.get()) { -RuntimeFilterWritable toAggregate = rfQueue.poll(); -if (toAggregate != null) { - if (aggregated != null) { -aggregated.aggregate(toAggregate); -currentBookId.incrementAndGet(); + try { Review comment: I still see there are race conditions. 1) containOne() is checked outside the lock 2) Same running flag is used by all 3 threads (netty thread, AsyncAggregateWorker and MinorFragment). Consider a case when Netty thread checks the `running` state and it is true and was preempted, then minor fragment thread came along and reset the running state and cleared up the queue. After this the netty thread was scheduled and it add's the `RumtimeFilterWritable` inside the queue. Now no-one will cleanup this newly added filter. Please see the top commit in this branch: https://github.com/sohami/drill/commits/DRILL-6731, if you think it looks good to you then cherry-pick this commit in your PR and only squash your original commit not the cherry-picked commit. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r219938720 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -81,6 +82,37 @@ public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { } } + public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) { +int len = data.length; +DrillBuf[] cloned = new DrillBuf[len]; +int i = 0; +for (DrillBuf src : data) { + int capacity = src.readableBytes(); + DrillBuf duplicateOne = bufferAllocator.buffer(capacity); + int readerIndex = src.readerIndex(); + src.readBytes(duplicateOne, 0, capacity); Review comment: consider using `duplicateOne.writeBytes(src)` instead of `src.readBytes(duplicateOne, 0, capacity)` since that will update the index of `duplicateOne` correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r219931819 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java ## @@ -224,15 +221,15 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder holder) throws RuntimeExcept Prel right = (Prel) hashJoinPrel.getRight(); holder.setFromBuildSide(true); right.accept(this, holder); - boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange(); - if (buildSideEncountererdBroadcastExchange) { + boolean routeToForeman = holder.needToRouteToForeman(); + if (!routeToForeman) { runtimeFilterDef.setSendToForeman(false); } else { runtimeFilterDef.setSendToForeman(true); } List bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { -if (buildSideEncountererdBroadcastExchange) { +if (!routeToForeman) { bloomFilterDef.setLocal(true); } else { bloomFilterDef.setLocal(false); Review comment: You can change above as: `bloomFilterDef.setLocal(!routeToForeman);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r219983879 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -81,6 +82,37 @@ public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { } } + public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) { +int len = data.length; +DrillBuf[] cloned = new DrillBuf[len]; +int i = 0; +for (DrillBuf src : data) { + int capacity = src.readableBytes(); + DrillBuf duplicateOne = bufferAllocator.buffer(capacity); + int readerIndex = src.readerIndex(); + src.readBytes(duplicateOne, 0, capacity); + src.readerIndex(readerIndex); + cloned[i] = duplicateOne; + i++; +} +return new RuntimeFilterWritable(runtimeFilterBDef, cloned); + } + + public boolean same(RuntimeFilterWritable other) { Review comment: Is the intention always to check for `RuntimeFilterWritable` equality based on below field of `runtimeFilterDef` ? If yes then we should override `equals(Object other)` and `hashcode()` method. Then put the logic in same method inside overriden method. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r219939606 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java ## @@ -81,6 +82,37 @@ public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { } } + public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) { +int len = data.length; +DrillBuf[] cloned = new DrillBuf[len]; +int i = 0; +for (DrillBuf src : data) { + int capacity = src.readableBytes(); + DrillBuf duplicateOne = bufferAllocator.buffer(capacity); + int readerIndex = src.readerIndex(); + src.readBytes(duplicateOne, 0, capacity); + src.readerIndex(readerIndex); + cloned[i] = duplicateOne; + i++; +} +return new RuntimeFilterWritable(runtimeFilterBDef, cloned); + } + + public boolean same(RuntimeFilterWritable other) { +BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef(); +int otherMajorId = runtimeFilterDef.getMajorFragmentId(); +int otherMinorId = runtimeFilterDef.getMinorFragmentId(); +int otherHashJoinOpId = runtimeFilterDef.getHjOpId(); +int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId(); +int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId(); +int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId(); +return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId; + } + + public String toString() { +return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId(); Review comment: `operatorId --> SrcOperatorId` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r218202111 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java ## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.drill.exec.rpc.NamedThreadFactory; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * This sink receives the RuntimeFilters from the netty thread, + * aggregates them in an async thread, supplies the aggregated + * one to the fragment running thread. + */ +public class RuntimeFilterSink implements AutoCloseable { Review comment: There are few race condition because of the way this class is implemented. Mainly the `RuntimeFilterSink` can be accessed in context of Netty thread and FragmentExecutor thread. Netty's thread will just add each received `RuntimeFilterWritable` into the queue and be done with it. The race condition mainly appears w.r.t `AsyncAggregateWorker` thread and `FragmentExecutor` thread where async thread might be updating the shared `aggregated` instance and fragment executor thread will be using the same instance thinking it's the older filter (specifically underlying bloomfilter DrillBuff). Also during `close()` there can be issues like async thread might have just received another runtimeFilter and `close()` will then update the running state and close `aggregated` instance and thinks queue is empty. Whereas async thread can then try to `aggregate` the received runtimeFilter. Please define a clean contract for this class. Few things to consider: - Async aggregated thread can be started during creation of RuntimeFilterSink - Consider using `BlockingQueue` since async thread should block until next item becomes available rather than just spinning based on a state. - access to shared resource `RuntimeFilterWritable aggregated` needs to be protected by a lock. - async thread to check for `running` state before aggregating and after retrieving an element from the queue. In case of running state set to false should `clear` the polled element. - This class should just return bloom filter list and fieldList rather than entire aggregated `RuntimeFilterWritable` since that can be modified by caller as it exposes setter methods. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r218203657 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -138,28 +133,12 @@ public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { int majorId = runtimeFilterB.getMajorFragmentId(); UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); List probeFields = runtimeFilterB.getProbeFieldsList(); -logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId)); -int size; -synchronized (this) { - size = joinMjId2scanSize.get(majorId); - Preconditions.checkState(size > 0); - RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId); - if (aggregatedRuntimeFilter == null) { -aggregatedRuntimeFilter = runtimeFilterWritable; - } else { -aggregatedRuntimeFilter.aggregate(runtimeFilterWritable); - } - joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter); - size--; - joinMjId2scanSize.put(majorId, size); -} -if (size == 0) { - broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields); -} +logger.info("RuntimeFilterRouter receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId)); +broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields, runtimeFilterWritable.getData()); Review comment: why not just pass `runtimeFilterWritable` to broadcast method instead of extracting individual things and passing it as parameter ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r217951308 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -138,24 +134,8 @@ public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { int majorId = runtimeFilterB.getMajorFragmentId(); UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); List probeFields = runtimeFilterB.getProbeFieldsList(); -logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId)); -int size; -synchronized (this) { - size = joinMjId2scanSize.get(majorId); - Preconditions.checkState(size > 0); - RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId); - if (aggregatedRuntimeFilter == null) { -aggregatedRuntimeFilter = runtimeFilterWritable; - } else { -aggregatedRuntimeFilter.aggregate(runtimeFilterWritable); - } - joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter); Review comment: Since we are just passing the received `RuntimeFilter` to corresponding `RuntimeFilterRecordBatch` which will do the aggregate now, we don't need to store the received `runtimeFilterWritable` anymore. But `broadcastAggregatedRuntimeFilter` should accept received filter as a parameter to send it across. Right now it's not so looks like we are not sending the received filter here and loosing it ? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi…
sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFi… URL: https://github.com/apache/drill/pull/1459#discussion_r217951489 ## File path: exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java ## @@ -60,17 +59,14 @@ * The HashJoinRecordBatch is responsible to generate the RuntimeFilter. * To Partitioned case: * The generated RuntimeFilter will be sent to the Foreman node. The Foreman node receives the RuntimeFilter - * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which - * steps over the Scan node will leverage the received RuntimeFilter to filter out the scanned rows to generate - * the SV2. + * async, broadcasts them to the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which + * steps over the Scan node will leverage the received RuntimeFilter (which will be aggregated at the Review comment: please change to: _The RuntimeFilterRecordBatch which is **downstream** to Scan node will aggregate all the received RuntimeFilter and will leverage it to filter out the scanned rows to generate SV2 _ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services