[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-10 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16645517#comment-16645517
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami closed pull request #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 608f05c567a..88c21d9e957 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import 
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -160,16 +161,15 @@ BufferAllocator getNewChildAllocator(final String 
operatorName,
   void close();
 
   /**
-   * Return null ,if setRuntimeFilter not being called
* @return
*/
-  RuntimeFilterWritable getRuntimeFilter();
+  RuntimeFilterSink getRuntimeFilterSink();
 
   /**
-   * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same 
MinorFragment
+   * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same 
MinorFragment
* @param runtimeFilter
*/
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
 
   interface ExecutorState {
 /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index a8980785a32..fcfdc8c6b54 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -61,6 +61,7 @@
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -136,7 +137,7 @@ public void interrupt(final InterruptedException e) {
   /** Stores constants and their holders by type */
   private final Map> 
constantValueHolderCache;
 
-  private RuntimeFilterWritable runtimeFilterWritable;
+  private RuntimeFilterSink runtimeFilterSink;
 
   /**
* Create a FragmentContext instance for non-root fragment.
@@ -208,6 +209,11 @@ public FragmentContextImpl(final DrillbitContext 
dbContext, final PlanFragment f
 stats = new FragmentStats(allocator, fragment.getAssignment());
 bufferManager = new BufferManagerImpl(this.allocator);
 constantValueHolderCache = Maps.newHashMap();
+boolean enableRF = 
context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+if (enableRF) {
+  ExecutorService executorService = context.getExecutor();
+  this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, 
executorService);
+}
   }
 
   /**
@@ -348,13 +354,13 @@ public boolean isUserAuthenticationEnabled() {
   }
 
   @Override
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-this.runtimeFilterWritable = runtimeFilter;
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+this.runtimeFilterSink.aggregate(runtimeFilter);
   }
 
   @Override
-  public RuntimeFilterWritable getRuntimeFilter() {
-return runtimeFilterWritable;
+  public RuntimeFilterSink getRuntimeFilterSink() {
+return runtimeFilterSink;
   }
 
   /**
@@ -470,9 +476,7 @@ public void close() {
 for (OperatorContextImpl opContext : contexts) {
   suppressingClose(opContext);
 }
-if (runtimeFilterWritable != null) {
-  suppressingClose(runtimeFilterWritable);
-}
+suppressingClose(runtimeFilterSink);
 suppressingClose(bufferManager);
 suppressingClose(allocator);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bc21580d369..9248bbc698e 100644
--- 

[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16644082#comment-16644082
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on issue #1459: DRILL-6731: Move the BFs aggregating work from 
the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-428349411
 
 
   +1 for last commit as well. LGTM


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>  Labels: ready-to-commit
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643271#comment-16643271
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-428174077
 
 
   @sohami  I do a minor change to at this additional commit. The change is to 
run the RuntimeFilter Aggregating work from a thread pool. This will be 
resource benefit.  Another change is to not spawn the RuntimeFilterSink while 
the RuntimeFilter enable option is false. Please give your review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>  Labels: ready-to-commit
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639439#comment-16639439
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-427279543
 
 
   +1


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-05 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639305#comment-16639305
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on issue #1459: DRILL-6731: Move the BFs aggregating work from 
the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-427254392
 
 
   Thanks!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639306#comment-16639306
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami edited a comment on issue #1459: DRILL-6731: Move the BFs aggregating 
work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-427254392
 
 
   Thanks! If the changes looks good to you. Please give your +1 for my commits.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639228#comment-16639228
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-427238455
 
 
   @sohami done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-04 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16639002#comment-16639002
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on issue #1459: DRILL-6731: Move the BFs aggregating work from 
the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-427195637
 
 
   @weijietong  - The fix is in the last commit of this branch 
(https://github.com/sohami/drill/commits/DRILL-6731-Final) . Please include it 
in your PR. With this change all the tests are passing if jdbc-all jar size is 
also increased. Due to the classes in this PR mainly proto generated files, the 
jar size is exceeding it's limit not because of any new dependency. I am seeing 
same issues with other PR so it would be good to make a single change to 
increase that jar size and then commit other PR's with it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16636496#comment-16636496
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on issue #1459: DRILL-6731: Move the BFs aggregating work from 
the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-426521845
 
 
   @weijietong - Thanks! I ran the pre-commit tests and few JPPD tests are 
failing due to some issues related to locking mechanism in RuntimeFilterSink. I 
will try to fix that and update the PR. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635429#comment-16635429
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-426259756
 
 
   @sohami done, really appreciate the effort you made.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635016#comment-16635016
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on issue #1459: DRILL-6731: Move the BFs aggregating work from 
the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-426146270
 
 
   @weijietong - Can you also please change the first commit message to include 
the JIRA number and title 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16634797#comment-16634797
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-426106397
 
 
   It has been rebased to the master, ready to be merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
> Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633880#comment-16633880
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-425877886
 
 
   @sohami applied what you suggest.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633627#comment-16633627
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221491365
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -90,27 +95,30 @@ public RuntimeFilterWritable duplicate(BufferAllocator 
bufferAllocator) {
   int capacity = src.readableBytes();
   DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
   int readerIndex = src.readerIndex();
-  src.readBytes(duplicateOne, 0, capacity);
+  duplicateOne.writeBytes(src);
   src.readerIndex(readerIndex);
   cloned[i] = duplicateOne;
   i++;
 }
 return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
   }
 
-  public boolean same(RuntimeFilterWritable other) {
-BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
-int otherMajorId = runtimeFilterDef.getMajorFragmentId();
-int otherMinorId = runtimeFilterDef.getMinorFragmentId();
-int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
-int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
-int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
-int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
-return otherMajorId == thisMajorId && otherMinorId == thisMinorId && 
otherHashJoinOpId == thisHashJoinOpId;
+  public String toString() {
+return identifier;
   }
 
-  public String toString() {
-return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + 
",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" 
+ runtimeFilterBDef.getHjOpId();
+  @Override
+  public boolean equals(Object other) {
 
 Review comment:
   Please include null check for other like:
   ```
   if (other == null) {
 return false;
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633629#comment-16633629
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221490851
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -223,16 +223,16 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder 
holder) throws RuntimeExcept
   right.accept(this, holder);
   boolean routeToForeman = holder.needToRouteToForeman();
   if (!routeToForeman) {
-runtimeFilterDef.setSendToForeman(false);
+runtimeFilterDef.setSendToForeman(!routeToForeman);
   } else {
-runtimeFilterDef.setSendToForeman(true);
+runtimeFilterDef.setSendToForeman(routeToForeman);
   }
   List bloomFilterDefs = 
runtimeFilterDef.getBloomFilterDefs();
   for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
 if (!routeToForeman) {
-  bloomFilterDef.setLocal(true);
+  bloomFilterDef.setLocal(!routeToForeman);
 } else {
-  bloomFilterDef.setLocal(false);
+  bloomFilterDef.setLocal(routeToForeman);
 
 Review comment:
   Same here you can remove `if-else` block


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-30 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16633628#comment-16633628
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221490803
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -223,16 +223,16 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder 
holder) throws RuntimeExcept
   right.accept(this, holder);
   boolean routeToForeman = holder.needToRouteToForeman();
   if (!routeToForeman) {
-runtimeFilterDef.setSendToForeman(false);
+runtimeFilterDef.setSendToForeman(!routeToForeman);
 
 Review comment:
   this should be `runtimeFilterDef.setSendToForeman(routeToForeman);` and 
correspondingly change for else condition. Also you don't need `if-else` block 
any more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632914#comment-16632914
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-425632216
 
 
   @sohami  please review , and the travis error message seems strange. I found 
other failed PRs error message seems the same saying: "The job exceeded the 
maximum time limit for jobs, and has been terminated".  Do you how to solve it 
? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-29 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632866#comment-16632866
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on a change in pull request #1459: DRILL-6731: Move the 
BFs aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221419927
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -89,16 +142,27 @@ public void close() throws Exception {
 
 @Override
 public void run() {
-  while (running.get()) {
-RuntimeFilterWritable toAggregate = rfQueue.poll();
-if (toAggregate != null) {
-  if (aggregated != null) {
-aggregated.aggregate(toAggregate);
-currentBookId.incrementAndGet();
+  try {
 
 Review comment:
   thanks, that's ok to me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16632166#comment-16632166
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221329807
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -36,25 +40,63 @@
 
   private RuntimeFilterWritable aggregated = null;
 
-  private Queue rfQueue = new ConcurrentLinkedQueue<>();
+  private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+this.bufferAllocator = bufferAllocator;
+AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+asyncAggregateThread.start();
+  }
+
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-rfQueue.add(runtimeFilterWritable);
-if (currentBookId.get() == 0) {
-  AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-  Thread asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-  asyncAggregateThread.start();
+if (running.get()) {
+  if (containOne()) {
+boolean same = aggregated.same(runtimeFilterWritable);
+if (!same) {
+  //This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
+  //share the same FragmentContext.
 
 Review comment:
   Yes I did consider the right deep tree case too and the logic looks fine to 
me.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631504#comment-16631504
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on a change in pull request #1459: DRILL-6731: Move the 
BFs aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221171069
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -36,25 +40,63 @@
 
   private RuntimeFilterWritable aggregated = null;
 
-  private Queue rfQueue = new ConcurrentLinkedQueue<>();
+  private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+this.bufferAllocator = bufferAllocator;
+AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+asyncAggregateThread.start();
+  }
+
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-rfQueue.add(runtimeFilterWritable);
-if (currentBookId.get() == 0) {
-  AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-  Thread asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-  asyncAggregateThread.start();
+if (running.get()) {
+  if (containOne()) {
+boolean same = aggregated.same(runtimeFilterWritable);
+if (!same) {
+  //This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
+  //share the same FragmentContext.
 
 Review comment:
   The directly example is this TPC-H sql:
   ```
   select l.l_orderkey, sum(l.l_extendedprice * (1 - l.l_discount)) as revenue, 
o.o_orderdate, o.o_shippriority  
   from dfs.`/tpch-parquet/customer` c, dfs.`/tpch-parquet/orders` o, 
dfs.`/tpch-parquet/lineitem` l  
   where c.c_mktsegment = 'HOUSEHOLD' and c.c_custkey = o.o_custkey and 
l.l_orderkey = o.o_orderkey and o.o_orderdate < date '1995-03-25' and 
l.l_shipdate > date '1995-03-25'  
   group by l.l_orderkey, o.o_orderdate, o.o_shippriority 
   order by revenue desc, o.o_orderdate limit 10
   ```
   The corresponding plan is:
   ```
   
   00-00Screen : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY 
o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost = 
{4051714.179997 rows, 3.094535517999211E7 cpu, 0.0 io, 0.0 network, 
1.298667392002E7 memory}, id = 3423
   00-01  Project(l_orderkey=[$0], revenue=[$1], o_orderdate=[$2], 
o_shippriority=[$3]) : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY 
o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost = 
{4051713.179997 rows, 3.094535417999211E7 cpu, 0.0 io, 0.0 network, 
1.298667392002E7 memory}, id = 3422
   00-02SelectionVectorRemover : rowType = RecordType(ANY l_orderkey, 
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative 
cost = {4051703.179997 rows, 3.094531417999211E7 cpu, 0.0 io, 0.0 network, 
1.298667392002E7 memory}, id = 3421
   00-03  Limit(fetch=[10]) : rowType = RecordType(ANY l_orderkey, ANY 
revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost 
= {4051693.179997 rows, 3.094530417999211E7 cpu, 0.0 io, 0.0 network, 
1.298667392002E7 memory}, id = 3420
   00-04SelectionVectorRemover : rowType = RecordType(ANY 
l_orderkey, ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 
3002.85997, cumulative cost = {4051683.179997 rows, 
3.094526417999211E7 cpu, 0.0 io, 0.0 network, 1.298667392002E7 memory}, id 
= 3419
   00-05  TopN(limit=[10]) : rowType = RecordType(ANY l_orderkey, 
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 
3002.85997, cumulative cost = {4048680.32 rows, 3.094226131999211E7 
cpu, 0.0 io, 0.0 network, 1.298667392002E7 memory}, id = 3418
   00-06Project(l_orderkey=[$0], revenue=[$3], 
o_orderdate=[$1], o_shippriority=[$2]) : rowType = RecordType(ANY l_orderkey, 
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 
3002.85997, cumulative cost = {4045677.46 rows, 3.086245904003E7 
cpu, 0.0 io, 0.0 network, 1.298667392002E7 memory}, id = 3417
   00-07  HashAgg(group=[{0, 1, 2}], revenue=[SUM($3)]) : 
rowType = RecordType(ANY l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY 
revenue): rowcount = 3002.85997, cumulative cost = {4042674.6 

[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-27 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631006#comment-16631006
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221069294
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -36,25 +40,63 @@
 
   private RuntimeFilterWritable aggregated = null;
 
-  private Queue rfQueue = new ConcurrentLinkedQueue<>();
+  private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+this.bufferAllocator = bufferAllocator;
+AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+asyncAggregateThread.start();
+  }
+
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-rfQueue.add(runtimeFilterWritable);
-if (currentBookId.get() == 0) {
-  AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-  Thread asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-  asyncAggregateThread.start();
+if (running.get()) {
+  if (containOne()) {
+boolean same = aggregated.same(runtimeFilterWritable);
+if (!same) {
+  //This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
+  //share the same FragmentContext.
 
 Review comment:
   Thanks for explanation. So for single minor fragment left deep tree case as 
shown below there will be only one `RuntimeFilterOperator` (RTF) inserted above 
left most Scan. Now when `next() `is called then first upper HJ build side will 
be evaluated which will create BloomFilter and send to RTF operator. Later 
`next()` will be called on probe side of first upper HJ which will again result 
in calling `next() `on build side of lower hash join. When build side of lower 
join is completed then it will again send its BloomFilter to RTF operator. 
Considering bloom filter received is from 2 different HJ operators (hence 
different `srcHashJoinOpId`) it will discard the first one and keep the second 
one. I guess this is done because join condition column can be different in 
different HashJoin ?
   
   ```
 HJ
/  \
  HJScan
 /  \
   Scan  Scan
   ```
   
   ```
 HJ
/  \
  HJScan
 /  \
   RTF  Scan
 |
   Scan
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629734#comment-16629734
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on a change in pull request #1459: DRILL-6731: Move the 
BFs aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r220789399
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -36,25 +40,63 @@
 
   private RuntimeFilterWritable aggregated = null;
 
-  private Queue rfQueue = new ConcurrentLinkedQueue<>();
+  private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+this.bufferAllocator = bufferAllocator;
+AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+asyncAggregateThread.start();
+  }
+
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-rfQueue.add(runtimeFilterWritable);
-if (currentBookId.get() == 0) {
-  AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-  Thread asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-  asyncAggregateThread.start();
+if (running.get()) {
+  if (containOne()) {
+boolean same = aggregated.same(runtimeFilterWritable);
+if (!same) {
+  //This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
+  //share the same FragmentContext.
 
 Review comment:
   To one fragment case , suppose there are two HashJoins (left deep tree) , 
then they are sharing the same FragmentContext and different RuntimeFilters 
from different HashJoins will be registered at the RuntimeFilterSink according 
to their execution sequence.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629368#comment-16629368
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r220256674
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -36,25 +40,63 @@
 
   private RuntimeFilterWritable aggregated = null;
 
-  private Queue rfQueue = new ConcurrentLinkedQueue<>();
+  private BlockingQueue rfQueue = new 
LinkedBlockingQueue<>();
 
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+this.bufferAllocator = bufferAllocator;
+AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+asyncAggregateThread.start();
+  }
+
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-rfQueue.add(runtimeFilterWritable);
-if (currentBookId.get() == 0) {
-  AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-  Thread asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-  asyncAggregateThread.start();
+if (running.get()) {
+  if (containOne()) {
+boolean same = aggregated.same(runtimeFilterWritable);
+if (!same) {
+  //This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
+  //share the same FragmentContext.
 
 Review comment:
   Can you please elaborate on this use case ? I didn't quite get it. Based on 
my understanding for one fragment case the RuntimeFilter will be directly set 
in the FragmentContext and will not be sent over wire. So we should not have 
received it in first place ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629366#comment-16629366
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r219931819
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -224,15 +221,15 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder 
holder) throws RuntimeExcept
   Prel right = (Prel) hashJoinPrel.getRight();
   holder.setFromBuildSide(true);
   right.accept(this, holder);
-  boolean buildSideEncountererdBroadcastExchange = 
holder.isEncounteredBroadcastExchange();
-  if (buildSideEncountererdBroadcastExchange) {
+  boolean routeToForeman = holder.needToRouteToForeman();
+  if (!routeToForeman) {
 runtimeFilterDef.setSendToForeman(false);
   } else {
 runtimeFilterDef.setSendToForeman(true);
   }
   List bloomFilterDefs = 
runtimeFilterDef.getBloomFilterDefs();
   for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-if (buildSideEncountererdBroadcastExchange) {
+if (!routeToForeman) {
   bloomFilterDef.setLocal(true);
 } else {
   bloomFilterDef.setLocal(false);
 
 Review comment:
   You can change above as:
   `bloomFilterDef.setLocal(!routeToForeman);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629365#comment-16629365
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r219938720
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -81,6 +82,37 @@ public void aggregate(RuntimeFilterWritable 
runtimeFilterWritable) {
 }
   }
 
+  public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+int len = data.length;
+DrillBuf[] cloned = new DrillBuf[len];
+int i = 0;
+for (DrillBuf src : data) {
+  int capacity = src.readableBytes();
+  DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+  int readerIndex = src.readerIndex();
+  src.readBytes(duplicateOne, 0, capacity);
 
 Review comment:
   consider using `duplicateOne.writeBytes(src)` instead of 
`src.readBytes(duplicateOne, 0, capacity)` since that will update the index of 
`duplicateOne` correctly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629367#comment-16629367
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r220710747
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -89,16 +142,27 @@ public void close() throws Exception {
 
 @Override
 public void run() {
-  while (running.get()) {
-RuntimeFilterWritable toAggregate = rfQueue.poll();
-if (toAggregate != null) {
-  if (aggregated != null) {
-aggregated.aggregate(toAggregate);
-currentBookId.incrementAndGet();
+  try {
+while (running.get()) {
+  RuntimeFilterWritable toAggregate = rfQueue.take();
+  if (!running.get()) {
+toAggregate.close();
+return;
+  }
+  if (containOne()) {
+try {
+  aggregatedRFLock.lock();
+  aggregated.aggregate(toAggregate);
 
 Review comment:
   `aggregated.close()` should be called to release Drillbuff reference it's 
holding to since `aggregate()` will not do that. This change is part of branch 
shared in above comment.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629364#comment-16629364
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r219983879
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -81,6 +82,37 @@ public void aggregate(RuntimeFilterWritable 
runtimeFilterWritable) {
 }
   }
 
+  public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+int len = data.length;
+DrillBuf[] cloned = new DrillBuf[len];
+int i = 0;
+for (DrillBuf src : data) {
+  int capacity = src.readableBytes();
+  DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+  int readerIndex = src.readerIndex();
+  src.readBytes(duplicateOne, 0, capacity);
+  src.readerIndex(readerIndex);
+  cloned[i] = duplicateOne;
+  i++;
+}
+return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
+  }
+
+  public boolean same(RuntimeFilterWritable other) {
 
 Review comment:
   Is the intention always to check for `RuntimeFilterWritable` equality based 
on below field of `runtimeFilterDef` ?  If yes then we should override 
`equals(Object other)` and `hashcode()` method. Then put the logic in same 
method inside overriden method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629363#comment-16629363
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r219931981
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 ##
 @@ -224,15 +221,15 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder 
holder) throws RuntimeExcept
   Prel right = (Prel) hashJoinPrel.getRight();
   holder.setFromBuildSide(true);
   right.accept(this, holder);
-  boolean buildSideEncountererdBroadcastExchange = 
holder.isEncounteredBroadcastExchange();
-  if (buildSideEncountererdBroadcastExchange) {
+  boolean routeToForeman = holder.needToRouteToForeman();
+  if (!routeToForeman) {
 runtimeFilterDef.setSendToForeman(false);
   } else {
 runtimeFilterDef.setSendToForeman(true);
 
 Review comment:
   You can change above as:
   `runtimeFilterDef.setSendToForeman(routeToForeman);`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629361#comment-16629361
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r219939606
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -81,6 +82,37 @@ public void aggregate(RuntimeFilterWritable 
runtimeFilterWritable) {
 }
   }
 
+  public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+int len = data.length;
+DrillBuf[] cloned = new DrillBuf[len];
+int i = 0;
+for (DrillBuf src : data) {
+  int capacity = src.readableBytes();
+  DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+  int readerIndex = src.readerIndex();
+  src.readBytes(duplicateOne, 0, capacity);
+  src.readerIndex(readerIndex);
+  cloned[i] = duplicateOne;
+  i++;
+}
+return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
+  }
+
+  public boolean same(RuntimeFilterWritable other) {
+BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
+int otherMajorId = runtimeFilterDef.getMajorFragmentId();
+int otherMinorId = runtimeFilterDef.getMinorFragmentId();
+int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
+int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
+int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
+int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
+return otherMajorId == thisMajorId && otherMinorId == thisMinorId && 
otherHashJoinOpId == thisHashJoinOpId;
+  }
+
+  public String toString() {
+return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + 
",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" 
+ runtimeFilterBDef.getHjOpId();
 
 Review comment:
   `operatorId --> SrcOperatorId`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-26 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16629362#comment-16629362
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r220710086
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -89,16 +142,27 @@ public void close() throws Exception {
 
 @Override
 public void run() {
-  while (running.get()) {
-RuntimeFilterWritable toAggregate = rfQueue.poll();
-if (toAggregate != null) {
-  if (aggregated != null) {
-aggregated.aggregate(toAggregate);
-currentBookId.incrementAndGet();
+  try {
 
 Review comment:
   I still see there are race conditions. 1) containOne() is checked outside 
the lock 2) Same running flag is used by all 3 threads (netty thread, 
AsyncAggregateWorker and MinorFragment). Consider a case when Netty thread 
checks the `running` state and it is true and was preempted, then minor 
fragment thread came along and reset the running state and cleared up the 
queue. After this the netty thread was scheduled and it add's the 
`RumtimeFilterWritable` inside the queue. Now no-one will cleanup this newly 
added filter.
   
   Please see the top commit in this branch: 
https://github.com/sohami/drill/commits/DRILL-6731, if you think it looks good 
to you then cherry-pick this commit in your PR and only squash your original 
commit not the cherry-picked commit.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16624517#comment-16624517
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-423720474
 
 
   @sohami sorry for late update , except fix thread safe problem, this commit 
also changed the  `RuntimeFilterVisitor` to support a query without exchange 
that's only one fragment case.Please review, tks.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618072#comment-16618072
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r218202111
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##
 @@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This sink receives the RuntimeFilters from the netty thread,
+ * aggregates them in an async thread, supplies the aggregated
+ * one to the fragment running thread.
+ */
+public class RuntimeFilterSink implements AutoCloseable {
 
 Review comment:
   There are few race condition because of the way this class is implemented. 
Mainly the `RuntimeFilterSink` can be accessed in context of Netty thread and 
FragmentExecutor thread. Netty's thread will just add each received 
`RuntimeFilterWritable` into the queue and be done with it. 
   The race condition mainly appears w.r.t `AsyncAggregateWorker` thread and 
`FragmentExecutor` thread where async thread might be updating the shared 
`aggregated` instance and fragment executor thread will be using the same 
instance thinking it's the older filter (specifically underlying bloomfilter 
DrillBuff). Also during `close()` there can be issues like async thread might 
have just received another runtimeFilter and `close()` will then update the 
running state and close `aggregated` instance and thinks queue is empty. 
Whereas async thread can then try to `aggregate` the received runtimeFilter.
   
   Please define a clean contract for this class. Few things to consider:
   
   - Async aggregated thread can be started during creation of RuntimeFilterSink
   - Consider using `BlockingQueue` since async thread should block until next 
item becomes available rather than just spinning based on a state.
   - access to shared resource `RuntimeFilterWritable aggregated` needs to be 
protected by a lock.
   - async thread to check for `running` state before aggregating and after 
retrieving an element from the queue. In case of running state set to false 
should `clear` the polled element.
   - This class should just return bloom filter list and fieldList rather than 
entire aggregated `RuntimeFilterWritable` since that can be modified by caller 
as it exposes setter methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618071#comment-16618071
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r218203657
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -138,28 +133,12 @@ public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
 int majorId = runtimeFilterB.getMajorFragmentId();
 UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
 List probeFields = runtimeFilterB.getProbeFieldsList();
-logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, 
queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
-int size;
-synchronized (this) {
-  size = joinMjId2scanSize.get(majorId);
-  Preconditions.checkState(size > 0);
-  RuntimeFilterWritable aggregatedRuntimeFilter = 
joinMjId2AggregatedRF.get(majorId);
-  if (aggregatedRuntimeFilter == null) {
-aggregatedRuntimeFilter = runtimeFilterWritable;
-  } else {
-aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
-  }
-  joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
-  size--;
-  joinMjId2scanSize.put(majorId, size);
-}
-if (size == 0) {
-  broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields);
-}
+logger.info("RuntimeFilterRouter receives a runtime filter , majorId:{}, 
queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
+broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields, 
runtimeFilterWritable.getData());
 
 Review comment:
   why not just pass `runtimeFilterWritable` to broadcast  method instead of 
extracting individual things and passing it as parameter ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617391#comment-16617391
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on a change in pull request #1459: DRILL-6731: Move the 
BFs aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r218034199
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -138,24 +134,8 @@ public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
 int majorId = runtimeFilterB.getMajorFragmentId();
 UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
 List probeFields = runtimeFilterB.getProbeFieldsList();
-logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, 
queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
-int size;
-synchronized (this) {
-  size = joinMjId2scanSize.get(majorId);
-  Preconditions.checkState(size > 0);
-  RuntimeFilterWritable aggregatedRuntimeFilter = 
joinMjId2AggregatedRF.get(majorId);
-  if (aggregatedRuntimeFilter == null) {
-aggregatedRuntimeFilter = runtimeFilterWritable;
-  } else {
-aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
-  }
-  joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
 
 Review comment:
   sorry for this bug, have fixed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617159#comment-16617159
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r217951308
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -138,24 +134,8 @@ public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
 int majorId = runtimeFilterB.getMajorFragmentId();
 UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
 List probeFields = runtimeFilterB.getProbeFieldsList();
-logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, 
queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
-int size;
-synchronized (this) {
-  size = joinMjId2scanSize.get(majorId);
-  Preconditions.checkState(size > 0);
-  RuntimeFilterWritable aggregatedRuntimeFilter = 
joinMjId2AggregatedRF.get(majorId);
-  if (aggregatedRuntimeFilter == null) {
-aggregatedRuntimeFilter = runtimeFilterWritable;
-  } else {
-aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
-  }
-  joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
 
 Review comment:
   Since we are just passing the received `RuntimeFilter` to corresponding 
`RuntimeFilterRecordBatch` which will do the aggregate now, we don't need to 
store the received `runtimeFilterWritable` anymore. But 
`broadcastAggregatedRuntimeFilter` should accept received filter as a parameter 
to send it across. Right now it's not so looks like we are not sending the 
received filter here and loosing it ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16617158#comment-16617158
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on a change in pull request #1459: DRILL-6731: Move the BFs 
aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r217951489
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
 ##
 @@ -60,17 +59,14 @@
  * The HashJoinRecordBatch is responsible to generate the RuntimeFilter.
  * To Partitioned case:
  * The generated RuntimeFilter will be sent to the Foreman node. The Foreman 
node receives the RuntimeFilter
- * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The 
RuntimeFilterRecordBatch which
- * steps over the Scan node will leverage the received RuntimeFilter to filter 
out the scanned rows to generate
- * the SV2.
+ * async, broadcasts them to the Scan nodes's MinorFragment. The 
RuntimeFilterRecordBatch which
+ * steps over the Scan node will leverage the received RuntimeFilter (which 
will be aggregated at the
 
 Review comment:
   please change to: _The RuntimeFilterRecordBatch which is **downstream** to 
Scan node will aggregate all the received RuntimeFilter and will leverage it to 
filter out the scanned rows to generate SV2 _


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-14 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614453#comment-16614453
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

sohami commented on issue #1459: DRILL-6731: Move the BFs aggregating work from 
the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-421248457
 
 
   @weijietong  - Sorry I totally missed this PR. Will try to review it over 
weekend


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (DRILL-6731) JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter

2018-09-06 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16605670#comment-16605670
 ] 

ASF GitHub Bot commented on DRILL-6731:
---

weijietong commented on issue #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#issuecomment-419061529
 
 
   @sohami @amansinha100  Could you review this PR ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> --
>
> Key: DRILL-6731
> URL: https://issues.apache.org/jira/browse/DRILL-6731
> Project: Apache Drill
>  Issue Type: Improvement
>  Components:  Server
>Affects Versions: 1.15.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>Priority: Major
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)