[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206017040
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -528,6 +590,27 @@ private void initializeBuild() {
 
   }
 
+  private void initializeRuntimeFilter() {
+if (!enableRuntimeFilter) {
+  return;
+}
+if (runtimeFilterReporter != null) {
+  return;
+}
+runtimeFilterReporter = new 
RuntimeFilterReporter((ExecutorFragmentContext) context);
+RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+if (runtimeFilterDef != null) {
 
 Review comment:
   `enableRuntimeFilter` is a global option to decide whether a HashJoin could 
generate RuntimeFilter. But it does not indicate the HashJoin must have a 
RuntimeFilterDef. There's some preconditions defined at the 
`RuntimeFilterManager` such as Left join is not allowed to use a BF to filter 
data. So we can't use `Preconditions.checkState(runtimeFilterDef != null)`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206015829
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   Btw, if there's too much multi-column joins , we can add an option to limit 
the generated  BloomFilter numbers. That's to say there's no need to generate 
one BF one joint column. We can choose to abandon generating some BFs to save 
the CPU cost and memory usage.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206014664
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
 
 Review comment:
   The `DrillBuf.getBytes(int index, byte[] dst)` will override the dst array's 
content. And we always invoke the getBytes method before using the dst array. 
So there's no risk to use a polluted byte array.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206014664
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
 
 Review comment:
   The `DrillBuf.getBytes(int index, byte[] dst)` will override the dst array's 
content. And we always invoke the getBytes method before usage the dst array. 
So there's no risk to use a polluted byte array.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r206014055
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   There's no intersection of the two separate BFs. The intersection meaning is 
implemented indirectly at the probe side `ScanBatch`. The separated BFs filter 
the probe side columns independently and set a BitSet (which indicates the 
possible rows matched the join condition ) together. This part of codes is at 
the `ScanBatch`'s `applyRuntimeFilter` and `computeBitSet` methods.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204136170
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
 ##
 @@ -44,14 +44,22 @@
   private final WorkEventBus workBus;
   private final WorkManager.WorkerBee bee;
 
+  private RuntimeFilterRequestHandler runtimeFilterRequestHandler;
+
   public DataServerRequestHandler(WorkEventBus workBus, WorkManager.WorkerBee 
bee) {
 this.workBus = workBus;
 this.bee = bee;
+this.runtimeFilterRequestHandler = new RuntimeFilterRequestHandler(bee);
   }
 
   @Override
   public void handle(DataServerConnection connection, int rpcType, ByteBuf 
pBody, ByteBuf dBody,
  ResponseSender sender) throws RpcException {
+if (rpcType == BitData.RpcType.REQ_RUNTIME_FILTER_VALUE) {
 
 Review comment:
   I think we can improve this `handle` method and there is no need to have 
separate handler per say for this different message. Instead we can have 
something similar to 
[ControlMessageHandler](https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java#L65).
 That is to say have a separate method to handle different request type which 
will return whether the request was successful or failure and the `handle` 
method will then send `Ack.OK` or `Ack.FAIL` accordingly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205962243
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
 ##
 @@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RuntimeFilterReporter {
 
 Review comment:
   Please add javadoc for this class


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205962226
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   This factory class doesn't look necessary instead there can be 2 separated 
constructors in BloomFilter class. Since BloomFilter is modifying the Drillbuf 
and kind of owns it, I think the creation of Drillbuf should happen inside the 
BloomFilter constructor itself rather than here.
   So constructors can be `BloomFilter(numBytes, bufferAllocator)` and 
`BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator)`. Then 
second constructor can just call first one after getting `optimalNumOfBytes`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205907403
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -528,6 +590,27 @@ private void initializeBuild() {
 
   }
 
+  private void initializeRuntimeFilter() {
+if (!enableRuntimeFilter) {
+  return;
+}
+if (runtimeFilterReporter != null) {
+  return;
+}
+runtimeFilterReporter = new 
RuntimeFilterReporter((ExecutorFragmentContext) context);
+RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+if (runtimeFilterDef != null) {
 
 Review comment:
   instead of the if check shouldn't there be 
`Preconditions.checkState(runtimeFilterDef != null)` ? Since if 
`enableRuntimeFilter` is true it's always expected to have this 
`runtimeFilterDef` ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205962033
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(DrillBuf byteBuf) {
+this.byteBuf = byteBuf;
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
 
 Review comment:
   why `writerIndex` is set to `numBytes` here, this will mean the buffer is 
fully consumed ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205894386
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
 ##
 @@ -42,6 +43,7 @@ public static RpcConfig getMapping(DrillConfig config, 
Executor executor) {
 .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, 
BitServerHandshake.class)
 .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, 
Ack.class)
 .add(RpcType.SASL_MESSAGE, SaslMessage.class, RpcType.SASL_MESSAGE, 
SaslMessage.class)
+.add(RpcType.REQ_RUNTIME_FILTER, BitData.RuntimeFilterBDef.class, 
RpcType.ACK, Ack.class)
 
 Review comment:
   please import specific class for `RuntimeFilterBDef`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205962087
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
+
+  public static BloomFilter spawnOne(int numBytes, BufferAllocator 
bufferAllocator)
+  {
+int size = BloomFilter.adjustByteSize(numBytes);
+DrillBuf drillBuf = bufferAllocator.buffer(size);
+BloomFilter bloomFilter = new BloomFilter(drillBuf);
 
 Review comment:
   Since `BloomFilter` is modifying the `Drillbuf` and kind of owns it, I think 
the creation of Drillbuf should happen inside the BloomFilter constructor 
itself rather than here. So constructor can be `BloomFilter(numBytes, 
bufferAllocator)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
sohami commented on a change in pull request #1334: DRILL-6385: Support JPPD 
feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205905500
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/RuntimeFilterRequestHandler.java
 ##
 @@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RequestHandler;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class RuntimeFilterRequestHandler implements 
RequestHandler {
 
 Review comment:
   Please add java docs for this new class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205982971
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
+
+
+  public BloomFilter(DrillBuf byteBuf) {
+this.byteBuf = byteBuf;
+this.numBytes = byteBuf.capacity();
+this.byteBuf.writerIndex(numBytes);
+  }
+
+
+  public static int adjustByteSize(int numBytes) {
+if (numBytes < MINIMUM_BLOOM_SIZE) {
+  numBytes = MINIMUM_BLOOM_SIZE;
+}
+
+if (numBytes > DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE) {
+  numBytes = DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE;
+}
+
+// 32 bytes alignment, one bucket.
+numBytes = (numBytes + 0x1F) & (~0x1F);
+return numBytes;
+  }
+
+  private void setMask(int key) {
+final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 
0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
 
 Review comment:
   Although I understand these are open source software but in general it is 
better to avoid referencing another project's implementation.  In this case, it 
is probably ok since picking these values is similar to picking prime numbers 
for hash computations and typically they are common across implementations. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205982764
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
 ##
 @@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import java.util.Arrays;
+
+
+/**
+ * According to Putze et al.'s "Cache-, Hash- and Space-Efficient BloomFilter
+ * Filters", the main theory is to construct tiny bucket bloom filters which
+ * benefit to the cpu cache and SIMD opcode.
+ */
+
+public class BloomFilter {
+  // Bytes in a bucket.
+  private static final int BYTES_PER_BUCKET = 32;
+  // Minimum bloom filter data size.
+  private static final int MINIMUM_BLOOM_SIZE = 256;
+
+  private static final int DEFAULT_MAXIMUM_BLOOM_FILTER_SIZE = 16 * 1024 * 
1024;
+
+  private DrillBuf byteBuf;
+
+  private int numBytes;
+
+  private int mask[] = new int[8];
+
+  private byte[] tempBucket = new byte[32];
 
 Review comment:
   Sounds good..but do you need to clear the tempBucket array across 
invocations ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205982746
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   Hmm...if the 2 columns create separate bloom filters, where does the ANDing 
(intersection) of the two BFs happen ?  In the current implementation there's 
an ORing of the BFs but I did not see ANDing (maybe I missed, so let me know).  
As far as the memory usage is concerned, we can continue down this path and 
evaluate both CPU cost and memory usage for multi-column joins where the 
cardinality of the columns ranges from low to high.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vrozov commented on a change in pull request #1383: DRILL-6613: Refactor MaterializedField

2018-07-29 Thread GitBox
vrozov commented on a change in pull request #1383: DRILL-6613: Refactor 
MaterializedField
URL: https://github.com/apache/drill/pull/1383#discussion_r205980762
 
 

 ##
 File path: 
exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
 ##
 @@ -49,39 +54,79 @@ private MaterializedField(String name, MajorType type, 
LinkedHashSet(size));
+  }
+
+  private  void copyFrom(Collection source, Function transformation) {
+Preconditions.checkState(children.isEmpty());
+source.forEach(child -> children.add(transformation.apply(child)));
+  }
+
+  public static MaterializedField create(String name, MajorType type) {
+return new MaterializedField(name, type, 0);
+  }
+
   public static MaterializedField create(SerializedField serField) {
-LinkedHashSet children = new LinkedHashSet<>();
-for (SerializedField sf : serField.getChildList()) {
-  children.add(MaterializedField.create(sf));
+MaterializedField field = new 
MaterializedField(serField.getNamePart().getName(), serField.getMajorType(), 
serField.getChildCount());
+if (OFFSETS_FIELD.equals(field)) {
+  return OFFSETS_FIELD;
 }
-return new MaterializedField(serField.getNamePart().getName(), 
serField.getMajorType(), children);
+field.copyFrom(serField.getChildList(), MaterializedField::create);
+return field;
   }
 
-  /**
-   * Create and return a serialized field based on the current state.
-   */
-  public SerializedField getSerializedField() {
-SerializedField.Builder serializedFieldBuilder = getAsBuilder();
-for(MaterializedField childMaterializedField : getChildren()) {
-  
serializedFieldBuilder.addChild(childMaterializedField.getSerializedField());
+  public MaterializedField copy() {
+return copy(getName(), getType());
+  }
+
+  public MaterializedField copy(MajorType type) {
+return copy(name, type);
+  }
+
+  public MaterializedField copy(String name) {
+return copy(name, getType());
+  }
+
+  public MaterializedField copy(String name, final MajorType type) {
+if (this == OFFSETS_FIELD) {
+  return this;
 }
-return serializedFieldBuilder.build();
+MaterializedField field = new MaterializedField(name, type, 
getChildren().size());
+field.copyFrom(getChildren(), MaterializedField::copy);
+return field;
   }
 
-  public SerializedField.Builder getAsBuilder() {
-return SerializedField.newBuilder()
-.setMajorType(type)
-.setNamePart(NamePart.newBuilder().setName(name).build());
+  public String getName() {
+return name;
+  }
+
+  public MajorType getType() {
+return type;
   }
 
   public Collection getChildren() {
-return new ArrayList<>(children);
+return children;
+  }
+
+  public int getWidth() {
+return type.getWidth();
+  }
+
 
 Review comment:
   I don't see such "informal" pattern. I do see more cases where drill prefers 
3 lines over a single line pattern, so is the change.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vrozov commented on a change in pull request #1383: DRILL-6613: Refactor MaterializedField

2018-07-29 Thread GitBox
vrozov commented on a change in pull request #1383: DRILL-6613: Refactor 
MaterializedField
URL: https://github.com/apache/drill/pull/1383#discussion_r205980707
 
 

 ##
 File path: 
exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
 ##
 @@ -49,39 +54,79 @@ private MaterializedField(String name, MajorType type, 
LinkedHashSet(size));
+  }
+
+  private  void copyFrom(Collection source, Function transformation) {
+Preconditions.checkState(children.isEmpty());
+source.forEach(child -> children.add(transformation.apply(child)));
+  }
+
+  public static MaterializedField create(String name, MajorType type) {
+return new MaterializedField(name, type, 0);
+  }
+
   public static MaterializedField create(SerializedField serField) {
-LinkedHashSet children = new LinkedHashSet<>();
-for (SerializedField sf : serField.getChildList()) {
-  children.add(MaterializedField.create(sf));
+MaterializedField field = new 
MaterializedField(serField.getNamePart().getName(), serField.getMajorType(), 
serField.getChildCount());
+if (OFFSETS_FIELD.equals(field)) {
+  return OFFSETS_FIELD;
 }
-return new MaterializedField(serField.getNamePart().getName(), 
serField.getMajorType(), children);
+field.copyFrom(serField.getChildList(), MaterializedField::create);
+return field;
   }
 
-  /**
-   * Create and return a serialized field based on the current state.
-   */
-  public SerializedField getSerializedField() {
-SerializedField.Builder serializedFieldBuilder = getAsBuilder();
-for(MaterializedField childMaterializedField : getChildren()) {
-  
serializedFieldBuilder.addChild(childMaterializedField.getSerializedField());
+  public MaterializedField copy() {
+return copy(getName(), getType());
+  }
+
+  public MaterializedField copy(MajorType type) {
+return copy(name, type);
+  }
+
+  public MaterializedField copy(String name) {
+return copy(name, getType());
+  }
+
+  public MaterializedField copy(String name, final MajorType type) {
+if (this == OFFSETS_FIELD) {
+  return this;
 }
-return serializedFieldBuilder.build();
+MaterializedField field = new MaterializedField(name, type, 
getChildren().size());
+field.copyFrom(getChildren(), MaterializedField::copy);
+return field;
   }
 
-  public SerializedField.Builder getAsBuilder() {
-return SerializedField.newBuilder()
-.setMajorType(type)
-.setNamePart(NamePart.newBuilder().setName(name).build());
+  public String getName() {
+return name;
+  }
+
+  public MajorType getType() {
+return type;
   }
 
   public Collection getChildren() {
-return new ArrayList<>(children);
+return children;
+  }
+
+  public int getWidth() {
+return type.getWidth();
+  }
 
 Review comment:
   I don't know why it was deprecated instead of being completely removed. If 
it needs to be removed, somebody has to open another JIRA/PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vrozov commented on a change in pull request #1383: DRILL-6613: Refactor MaterializedField

2018-07-29 Thread GitBox
vrozov commented on a change in pull request #1383: DRILL-6613: Refactor 
MaterializedField
URL: https://github.com/apache/drill/pull/1383#discussion_r205980599
 
 

 ##
 File path: 
exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
 ##
 @@ -49,39 +54,79 @@ private MaterializedField(String name, MajorType type, 
LinkedHashSet(size));
+  }
+
+  private  void copyFrom(Collection source, Function transformation) {
+Preconditions.checkState(children.isEmpty());
+source.forEach(child -> children.add(transformation.apply(child)));
+  }
+
+  public static MaterializedField create(String name, MajorType type) {
+return new MaterializedField(name, type, 0);
+  }
+
   public static MaterializedField create(SerializedField serField) {
-LinkedHashSet children = new LinkedHashSet<>();
-for (SerializedField sf : serField.getChildList()) {
-  children.add(MaterializedField.create(sf));
+MaterializedField field = new 
MaterializedField(serField.getNamePart().getName(), serField.getMajorType(), 
serField.getChildCount());
+if (OFFSETS_FIELD.equals(field)) {
+  return OFFSETS_FIELD;
 }
-return new MaterializedField(serField.getNamePart().getName(), 
serField.getMajorType(), children);
+field.copyFrom(serField.getChildList(), MaterializedField::create);
+return field;
   }
 
-  /**
-   * Create and return a serialized field based on the current state.
-   */
-  public SerializedField getSerializedField() {
-SerializedField.Builder serializedFieldBuilder = getAsBuilder();
-for(MaterializedField childMaterializedField : getChildren()) {
-  
serializedFieldBuilder.addChild(childMaterializedField.getSerializedField());
+  public MaterializedField copy() {
+return copy(getName(), getType());
+  }
+
+  public MaterializedField copy(MajorType type) {
+return copy(name, type);
+  }
+
+  public MaterializedField copy(String name) {
+return copy(name, getType());
+  }
+
+  public MaterializedField copy(String name, final MajorType type) {
+if (this == OFFSETS_FIELD) {
+  return this;
 }
-return serializedFieldBuilder.build();
+MaterializedField field = new MaterializedField(name, type, 
getChildren().size());
+field.copyFrom(getChildren(), MaterializedField::copy);
 
 Review comment:
   @paul-rogers In cases where `deep` copy is not necessary, 
`MaterializedField` already provides `create()` method that takes name and 
type. There is no need to introduce `copyForNewVector`, `fullCopy` or 
`copyIfNeeded` IMO.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vrozov commented on a change in pull request #1383: DRILL-6613: Refactor MaterializedField

2018-07-29 Thread GitBox
vrozov commented on a change in pull request #1383: DRILL-6613: Refactor 
MaterializedField
URL: https://github.com/apache/drill/pull/1383#discussion_r205980354
 
 

 ##
 File path: 
exec/vector/src/main/java/org/apache/drill/exec/record/MaterializedField.java
 ##
 @@ -49,39 +54,79 @@ private MaterializedField(String name, MajorType type, 
LinkedHashSet(size));
+  }
+
+  private  void copyFrom(Collection source, Function transformation) {
+Preconditions.checkState(children.isEmpty());
+source.forEach(child -> children.add(transformation.apply(child)));
+  }
+
+  public static MaterializedField create(String name, MajorType type) {
+return new MaterializedField(name, type, 0);
+  }
+
   public static MaterializedField create(SerializedField serField) {
-LinkedHashSet children = new LinkedHashSet<>();
-for (SerializedField sf : serField.getChildList()) {
-  children.add(MaterializedField.create(sf));
+MaterializedField field = new 
MaterializedField(serField.getNamePart().getName(), serField.getMajorType(), 
serField.getChildCount());
+if (OFFSETS_FIELD.equals(field)) {
+  return OFFSETS_FIELD;
 }
-return new MaterializedField(serField.getNamePart().getName(), 
serField.getMajorType(), children);
+field.copyFrom(serField.getChildList(), MaterializedField::create);
+return field;
   }
 
-  /**
-   * Create and return a serialized field based on the current state.
-   */
-  public SerializedField getSerializedField() {
-SerializedField.Builder serializedFieldBuilder = getAsBuilder();
-for(MaterializedField childMaterializedField : getChildren()) {
-  
serializedFieldBuilder.addChild(childMaterializedField.getSerializedField());
+  public MaterializedField copy() {
+return copy(getName(), getType());
+  }
+
+  public MaterializedField copy(MajorType type) {
+return copy(name, type);
+  }
+
+  public MaterializedField copy(String name) {
+return copy(name, getType());
+  }
+
+  public MaterializedField copy(String name, final MajorType type) {
+if (this == OFFSETS_FIELD) {
 
 Review comment:
   > Not only is the offsets field reusable, so is the bits field. So, we' see 
to be making a statement that there is something special about offsets.
   
   Offset field was already handled differently compared to other fields 
including bits field prior to the PR. The only new behavior that the PR 
introduces is a singleton pattern for offset field during copy operation to 
avoid compare by name where compare by identity should be used in the first 
place. As there was no compare by name for bits, bits field is not handled. I 
do agree that bits can be handled similar to offset field, but it is not a goal 
for this PR. The goal of the PR outlined in the JIRA description is to avoid 
using `clone()` for `MaterializedField`.
   
   > Offsets is reusable because it is fixed size, required. So, if we handle 
the case for one fixed size, required field, we should handle all of them. 
Else, again, we're saying that something is special about offsets.
   
   Offset field is reusable because it is immutable (name, type and children 
are all final). Bits field can be also made reusable, but it is not a goal of 
the PR. The PR does not make offset field special as it was already handled 
differently prior to the PR.
   
   >Further, since the resulting code copies sometimes, but not others, this is 
not a true "copy" operation. It is a "copy if necessary" operation. 
Implementation-wise, the "copy if necessary" should either return the same 
field (if no copy is needed) or call a "copy" method to make an actual copy 
when needed.
   
   It is the only `copy` operation that is available outside of 
`MaterializedField` class and no any other `copy` should exist. The `copy` 
decides whether or not it needs to create another instance or it can return a 
singleton.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r204973201
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.planner.physical.HashJoinPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * for which is JPPD (join predicate push down) is possible. The prerequisite 
to do JPPD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocking operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
number
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * This class maintains context for the runtime join push down's filter 
management. It
+   * does a traversal of the physical operators by leveraging the root wrapper 
which indirectly
+   * holds the global PhysicalOperator tree and contains the minor fragment 
endpoints.
+   * @param workUnit
+   * @param drillbitContext
+   */
+  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext 
drillbitContext) {
+this.rootWrapper = workUnit.getRootWrapper();
+

[GitHub] weijietong commented on issue #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on issue #1334: DRILL-6385: Support JPPD feature
URL: https://github.com/apache/drill/pull/1334#issuecomment-408674169
 
 
   @amansinha100 please review the changes made according to your advices.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-29 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r205965375
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterCreator.java
 ##
 @@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
+
+public class BloomFilterCreator {
 
 Review comment:
   Rethink about this point , I think the current one join column one bloom 
filter strategy is right. I would prefer this strategy. To your example , let's 
assume t1.a1 is a low cardinality column (assume a sex column ), t1.b1 is a 
high cardinality column. If we take the multi-join column one bloom filter 
strategy, the required bloom filter's memory size should keep the same 
proportion with the joint two column cardinality size. So this is not 
acceptable. To the current implementation, the low cardinality joint column 
will only consume low memory size, the high cardinality column will consume 
another independent part of bloom filter memory. To the high cardinality column 
case ,maybe we can later add an option to decide whether to enable this bloom 
filter to push down.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services