[GitHub] ppadma commented on issue #1324: DRILL-6310: limit batch size for hash aggregate

2018-07-01 Thread GitBox
ppadma commented on issue #1324: DRILL-6310: limit batch size for hash aggregate
URL: https://github.com/apache/drill/pull/1324#issuecomment-401675042
 
 
   @vvysotskyi sorry about that. fixed the problem.updated the PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199376103
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 ##
 @@ -226,6 +244,96 @@ public IterOutcome next() {
 }
   }
 
+  private void applyRuntimeFilter() throws SchemaChangeException {
+RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+if (runtimeFilterWritable == null) {
+  return;
+}
+if (recordCount <= 0) {
+  return;
+}
+List bloomFilters = runtimeFilterWritable.unwrap();
+if (hash64 == null) {
+  ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, 
context);
+  try {
+//generate hash helper
+this.toFilterFields = 
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+List hashFieldExps = new ArrayList<>();
+List typedFieldIds = new ArrayList<>();
+for (String toFilterField : toFilterFields) {
+  SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+  TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+  this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+  typedFieldIds.add(typedFieldId);
+  ValueVectorReadExpression toHashFieldExp = new 
ValueVectorReadExpression(typedFieldId);
+  hashFieldExps.add(toHashFieldExp);
+}
+hash64 = hashHelper.getHash64(hashFieldExps.toArray(new 
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new 
TypedFieldId[typedFieldIds.size()]));
+  } catch (Exception e) {
+throw UserException.internalError(e).build(logger);
+  }
+}
+selectionVector2.allocateNew(recordCount);
+BitSet bitSet = new BitSet(recordCount);
+for (int i = 0; i < toFilterFields.size(); i++) {
+  BloomFilter bloomFilter = bloomFilters.get(i);
+  String fieldName = toFilterFields.get(i);
+  computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+}
+int svIndex = 0;
+int tmpFilterRows = 0;
+for (int i = 0; i < recordCount; i++) {
+  boolean contain = bitSet.get(i);
+  if (contain) {
+selectionVector2.setIndex(svIndex, i);
+svIndex++;
+  } else {
+tmpFilterRows++;
+  }
+}
+selectionVector2.setRecordCount(svIndex);
+if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+  recordCount = 0;
+  selectionVector2.clear();
+  logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+  return;
+}
+if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+  totalFilterRows = totalFilterRows + tmpFilterRows;
+  recordCount = svIndex;
+  BatchSchema batchSchema = this.schema;
+  VectorContainer backUpContainer = new 
VectorContainer(this.oContext.getAllocator(), batchSchema);
+  int fieldCount = batchSchema.getFieldCount();
+  for (int i = 0; i < fieldCount; i++) {
+ValueVector from = 
this.getContainer().getValueVector(i).getValueVector();
+ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+to.setInitialCapacity(svIndex);
+for (int r = 0; r < svIndex; r++) {
+  to.copyEntry(r, from, selectionVector2.getIndex(r));
 
 Review comment:
   Hi @weijietong ,  the physical plan operators are supposed to implement 2 
APIs:  `getSupportedEncodings()` API (see [1]) which indicates whether or not 
it accepts an SelectionVector2 (other options are SV4 and NONE) and the 
corresponding `getEncoding()` API.  If an operator does not accept an SV2 or 
SV4, the planner will insert a SelectionVectorRemover just below that node 
which essentially does the copying of qualified rows.  Note that the SVRemover 
is implemented by RemovingRecordBatch [2] which uses a `StraightCopier` 
whenever the child has NONE, so in that case it just does a simple transfer 
instead of copy.  
   
   In the case of Filter,  it can accept both NONE and SV2 because it is 
possible in some cases to have a filter on top of another filter (with some 
intermediate non-blocking operator).  
   
   Let me know if this makes sense.   The main reason I am proposing this is 
the Filter-Scan is a very common pattern and we would want to minimize the copy 
overhead as much as possible.  Unfortunately, I am tied up with some other work 
otherwise I could make the changes on top of your branch to experiment. 
   
   [1] 
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/Prel.java#L51
   [2] 
https://github.com/apache/drill/blob/master/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java#L57


This is an automated message from the Apache Git 

[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199374038
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
 ##
 @@ -32,35 +33,53 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
+import org.apache.drill.exec.work.filter.RuntimeFilterDef;
+
 
 @JsonTypeName("hash-join")
+@JsonIgnoreProperties(ignoreUnknown = true)
 
 Review comment:
   The primary reason is to backward compatible. PhysicalPlans without 
`runtimeFilterDef` property (maybe not support RuntimeFilter) can still work 
out. That is the `ignoreUnknown=true` usage.  The `@JsonIgnore` is to not 
output the property content as the final json. That's not our target.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (DRILL-6061) Doc Request: Global Query List showing queries from all Drill foreman nodes

2018-07-01 Thread Bridget Bevens (JIRA)


 [ 
https://issues.apache.org/jira/browse/DRILL-6061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bridget Bevens resolved DRILL-6061.
---
Resolution: Fixed

I've added the following doc:
https://drill.apache.org/docs/global-query-list/ 
Hopefully this resolves the issue. If not, please update this JIRA with 
feedback. 
Thanks,
Bridget

> Doc Request: Global Query List showing queries from all Drill foreman nodes
> ---
>
> Key: DRILL-6061
> URL: https://issues.apache.org/jira/browse/DRILL-6061
> Project: Apache Drill
>  Issue Type: Task
>  Components: Documentation, Metadata, Web Server
>Affects Versions: 1.11.0
> Environment: MapR 5.2
>Reporter: Hari Sekhon
>Assignee: Bridget Bevens
>Priority: Major
>  Labels: doc-impacting
> Fix For: 1.14.0
>
>
> Documentation Request to improve doc around Global Query List to show all 
> queries executed across all Drill nodes in a cluster for better management 
> and auditing.
> It wasn't obvious to be able to see all queries across all nodes in a Drill 
> cluster. The Web UI on any given Drill node only shows the queries 
> coordinated by that local node if acting as the foreman for the query, so if 
> using ZooKeeper or a Load Balancer to distribute queries via different Drill 
> nodes (eg. 
> [https://github.com/HariSekhon/nagios-plugins/tree/master/haproxy|https://github.com/HariSekhon/nagios-plugins/tree/master/haproxy])
>  then the query list will be spread across lots of different nodes with no 
> global timeline of queries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] weijietong commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
weijietong commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199369589
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 ##
 @@ -226,6 +244,96 @@ public IterOutcome next() {
 }
   }
 
+  private void applyRuntimeFilter() throws SchemaChangeException {
+RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+if (runtimeFilterWritable == null) {
+  return;
+}
+if (recordCount <= 0) {
+  return;
+}
+List bloomFilters = runtimeFilterWritable.unwrap();
+if (hash64 == null) {
+  ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, 
context);
+  try {
+//generate hash helper
+this.toFilterFields = 
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+List hashFieldExps = new ArrayList<>();
+List typedFieldIds = new ArrayList<>();
+for (String toFilterField : toFilterFields) {
+  SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+  TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+  this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+  typedFieldIds.add(typedFieldId);
+  ValueVectorReadExpression toHashFieldExp = new 
ValueVectorReadExpression(typedFieldId);
+  hashFieldExps.add(toHashFieldExp);
+}
+hash64 = hashHelper.getHash64(hashFieldExps.toArray(new 
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new 
TypedFieldId[typedFieldIds.size()]));
+  } catch (Exception e) {
+throw UserException.internalError(e).build(logger);
+  }
+}
+selectionVector2.allocateNew(recordCount);
+BitSet bitSet = new BitSet(recordCount);
+for (int i = 0; i < toFilterFields.size(); i++) {
+  BloomFilter bloomFilter = bloomFilters.get(i);
+  String fieldName = toFilterFields.get(i);
+  computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+}
+int svIndex = 0;
+int tmpFilterRows = 0;
+for (int i = 0; i < recordCount; i++) {
+  boolean contain = bitSet.get(i);
+  if (contain) {
+selectionVector2.setIndex(svIndex, i);
+svIndex++;
+  } else {
+tmpFilterRows++;
+  }
+}
+selectionVector2.setRecordCount(svIndex);
+if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+  recordCount = 0;
+  selectionVector2.clear();
+  logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+  return;
+}
+if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+  totalFilterRows = totalFilterRows + tmpFilterRows;
+  recordCount = svIndex;
+  BatchSchema batchSchema = this.schema;
+  VectorContainer backUpContainer = new 
VectorContainer(this.oContext.getAllocator(), batchSchema);
+  int fieldCount = batchSchema.getFieldCount();
+  for (int i = 0; i < fieldCount; i++) {
+ValueVector from = 
this.getContainer().getValueVector(i).getValueVector();
+ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+to.setInitialCapacity(svIndex);
+for (int r = 0; r < svIndex; r++) {
+  to.copyEntry(r, from, selectionVector2.getIndex(r));
 
 Review comment:
   @aman thanks for other valuable reviews ,will soon update that. To this 
point, I initially tend to output the SV2. But to fellow reasons, I give up: 
   
   * Not all the possible operators support the SelectionModel. If users’ rule 
pushed down the filter conditions,the output SV2 maybe not processed by the not 
supported operators(i.e. it’s not definitive  to have a operator which supports 
the SelectionModel above the Scan node).
   
   * The BatchSchema’s SelectionModel also becomes a runtime var.This will also 
affect the upper filter node’s code-gen logic to dynamically generate fresh 
filter codes to the Scan’s SV2.
   
   I agree that the memory copy cost will be less if the above filter node can 
filter more rows over the Scan’s SV2.So what your opinion about this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] aravi5 commented on issue #1272: DRILL-5977: Filter Pushdown in Drill-Kafka plugin

2018-07-01 Thread GitBox
aravi5 commented on issue #1272: DRILL-5977: Filter Pushdown in Drill-Kafka 
plugin
URL: https://github.com/apache/drill/pull/1272#issuecomment-401639836
 
 
   @akumarb2010 - Thank you for reviewing the design and implementation, and 
providing valuable inputs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler

2018-07-01 Thread GitBox
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins 
Handler
URL: https://github.com/apache/drill/pull/1345#discussion_r199358015
 
 

 ##
 File path: 
contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json
 ##
 @@ -2,8 +2,8 @@
   "storage":{
 kafka : {
   type:"kafka",
-  enabled: false,
-  kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : 
"drill-consumer"}
+  kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : 
"drill-consumer"},
+  enabled: false
 
 Review comment:
   It looks like Hive plugin is the only one plugin with such order of 
properties. For all other plugins the enabled status appears in the end of 
config after deserializing.
   So I return enabled status for Hive plugin and leave for others.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler

2018-07-01 Thread GitBox
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins 
Handler
URL: https://github.com/apache/drill/pull/1345#discussion_r199359888
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
 ##
 @@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import com.jasonclawson.jackson.dataformat.hocon.HoconFactory;
+import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.LogicalPlanPersistence;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.sys.PersistentStore;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * Drill plugins handler, which allows to update storage plugins configs from 
the
+ * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file
+ *
+ * TODO: DRILL-6564: It can be improved with configs versioning and service of 
creating
+ * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF}
+ */
+public class StoragePluginsHandlerService implements StoragePluginsHandler {
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(StoragePluginsHandlerService.class);
+
+  private final LogicalPlanPersistence hoconLogicalPlanPersistence;
+
+  public StoragePluginsHandlerService(DrillbitContext context) {
+hoconLogicalPlanPersistence = new 
LogicalPlanPersistence(context.getConfig(), context.getClasspathScan(),
+new ObjectMapper(new HoconFactory()));
+  }
+
+  @Override
+  public void loadPlugins(@NotNull PersistentStore 
persistentStore,
+  @Nullable StoragePlugins bootstrapPlugins) {
+// if bootstrapPlugins is not null -- fresh Drill set up
+StoragePlugins pluginsToBeWrittenToPersistentStore;
+
+StoragePlugins newPlugins = getNewStoragePlugins();
+
+if (newPlugins != null) {
+  pluginsToBeWrittenToPersistentStore = new StoragePlugins(new 
HashMap<>());
+  Optional.ofNullable(bootstrapPlugins)
+  .ifPresent(pluginsToBeWrittenToPersistentStore::putAll);
+
+  for (Map.Entry newPlugin : newPlugins) {
+String pluginName = newPlugin.getKey();
+StoragePluginConfig oldPluginConfig = 
Optional.ofNullable(bootstrapPlugins)
+.map(plugins -> plugins.getConfig(pluginName))
+.orElse(persistentStore.get(pluginName));
+StoragePluginConfig updatedStatusPluginConfig = 
updatePluginStatus(oldPluginConfig, newPlugin.getValue());
+pluginsToBeWrittenToPersistentStore.put(pluginName, 
updatedStatusPluginConfig);
+  }
+} else {
+  pluginsToBeWrittenToPersistentStore = bootstrapPlugins;
+}
+
+// load pluginsToBeWrittenToPersistentStore to Persistent Store
+Optional.ofNullable(pluginsToBeWrittenToPersistentStore)
+.ifPresent(plugins -> plugins.forEach(plugin -> 
persistentStore.put(plugin.getKey(), plugin.getValue(;
+  }
+
+  /**
+   * Helper method to identify the enabled status for new storage plugins 
config. If this status is absent in the updater
+   * file, the status is kept from the configs, which are going to be updated
+   *
+   * @param oldPluginConfig current storage plugin config from Persistent 
Store or bootstrap config file
+   * @param newPluginConfig new storage plugin config
+   * @return new storage plugin config with updated enabled status
+   */
+  private StoragePluginConfig updatePluginStatus(@Nullable StoragePluginConfig 
oldPluginConfig,
+ @NotNull StoragePluginConfig 
newPluginConfig) {
+if 

[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler

2018-07-01 Thread GitBox
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins 
Handler
URL: https://github.com/apache/drill/pull/1345#discussion_r199359807
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java
 ##
 @@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.planner.logical.StoragePlugins;
+import org.apache.drill.exec.store.sys.PersistentStore;
+
+
+/**
+ * Storage plugins handler is an additional service for updating storage 
plugins configs from the file
+ */
+public interface StoragePluginsHandler {
+
+  /**
+   * Update incoming storage plugins configs from persistence store if 
present, otherwise bootstrap plugins configs.
+   * One of the params should be null, second shouldn't
+   *
+   * @param persistentStore the last storage plugins configs from persistence 
store
+   * @param bootstrapPlugins bootstrap storage plugins, which are used in case 
of first Drill start up
+   * @return all storage plugins, which should be loaded into persistence store
+   */
+  void loadPlugins(PersistentStore persistentStore, 
StoragePlugins bootstrapPlugins);
 
 Review comment:
   Let's say the current approach is method1 and the approach of transferring 
the all configs from PS to handler is method2.
   So for both of them the first stage is common: 
   1. Getting the iterator and check for any element - 
`pluginSystemTable.getAll().hasNext()`, which is used to detect whether any 
plugins are present in PStore or this is the first set-up.
   2. Then for method 2 I need to extract every plugin configs and put it to 
the `StoragePlugins` - N calls to PStore.
   For method 1 I need pass just reference to PStore. 
   3. For method2  I need to update plugins and to determine only updated 
plugins configs to put them to PStore - <= N calls.
   Similar in method1 I need to get new plugins if exist and put them to PStore 
- <= N calls.
   _
   The benefit in the second stage.
   I agree with you regarding registry responsibilities. But current approach 
is also good: anyway currently `StoragePluginsHandler` instance is a part of 
`StoragePluginRegistryImpl` and also loading configs is better from code 
understanding point of view than updating and returning different results for 
different modes (you can compare with changes from the first commit).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler

2018-07-01 Thread GitBox
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins 
Handler
URL: https://github.com/apache/drill/pull/1345#discussion_r199358139
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java
 ##
 @@ -17,22 +17,51 @@
  */
 package org.apache.drill.exec.store;
 
+import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.drill.common.logical.StoragePluginConfig;
 
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName("named")
+@JsonTypeName(NamedStoragePluginConfig.NAME)
 public class NamedStoragePluginConfig extends StoragePluginConfig {
-  public String name;
+
+  public static final String NAME = "named";
+
+  private final String name;
+
+  public NamedStoragePluginConfig(@JsonProperty("name") String name) {
 
 Review comment:
   Missed that, thanks. Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins Handler

2018-07-01 Thread GitBox
vdiravka commented on a change in pull request #1345: DRILL-6494: Drill Plugins 
Handler
URL: https://github.com/apache/drill/pull/1345#discussion_r199357677
 
 

 ##
 File path: 
contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
 ##
 @@ -52,18 +50,18 @@
 public class HiveSchemaFactory implements SchemaFactory {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveSchemaFactory.class);
 
-  // MetaStoreClient created using process user credentials
-  private final DrillHiveMetaStoreClient processUserMetastoreClient;
-  // MetasStoreClient created using SchemaConfig credentials
-  private final LoadingCache 
metaStoreClientLoadingCache;
+  // MetaStoreClient created using process user credentials. Null if client 
can't be instantiated
+  private DrillHiveMetaStoreClient processUserMetastoreClient;
 
 Review comment:
   It may not be initialized in the constructor. It was done to avoid `null` 
value in Hive storage plugin update window.
   But I don't like this approach anymore.
   
   I have described the issue here: 
[DRILL-6412](https://issues.apache.org/jira/browse/DRILL-6412?focusedCommentId=16528944=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16528944).
   I think until it will be solved as mentioned in DRILL-6412, the workaround 
could be used,: `"hive.metastore.schema.verification": "false"` property in 
Hive `bootstrap-storage.json`. It allows instantiate Hive client properly as in 
earlier 1.2 version of Drill Hive client.
   Also this properties should be documented for configuring Hive Embedded 
Metastore:
   
https://drill.apache.org/docs/hive-storage-plugin/#hive-embedded-metastore-configuration


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vvysotskyi commented on issue #1324: DRILL-6310: limit batch size for hash aggregate

2018-07-01 Thread GitBox
vvysotskyi commented on issue #1324: DRILL-6310: limit batch size for hash 
aggregate
URL: https://github.com/apache/drill/pull/1324#issuecomment-401618980
 
 
   Unit tests in travis build for this PR failed, @ppadma, could you please 
take a look at this?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ppadma commented on issue #1342: DRILL-6537:Limit the batch size for buffering operators based on how …

2018-07-01 Thread GitBox
ppadma commented on issue #1342: DRILL-6537:Limit the batch size for buffering 
operators based on how …
URL: https://github.com/apache/drill/pull/1342#issuecomment-401618524
 
 
   @vvysotskyi Done.
   I accidentally closed the PR. Reopened it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ppadma opened a new pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how …

2018-07-01 Thread GitBox
ppadma opened a new pull request #1342: DRILL-6537:Limit the batch size for 
buffering operators based on how …
URL: https://github.com/apache/drill/pull/1342
 
 
   …much memory they get
   
   @Ben-Zvi please review.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199338763
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.proto.BitData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RuntimeFilterWritable implements AutoCloseables.Closeable {
 
 Review comment:
   Pls add a javadoc for this class. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199334516
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 ##
 @@ -226,6 +244,96 @@ public IterOutcome next() {
 }
   }
 
+  private void applyRuntimeFilter() throws SchemaChangeException {
+RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+if (runtimeFilterWritable == null) {
+  return;
+}
+if (recordCount <= 0) {
+  return;
+}
+List bloomFilters = runtimeFilterWritable.unwrap();
+if (hash64 == null) {
+  ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, 
context);
+  try {
+//generate hash helper
+this.toFilterFields = 
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+List hashFieldExps = new ArrayList<>();
+List typedFieldIds = new ArrayList<>();
+for (String toFilterField : toFilterFields) {
+  SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+  TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+  this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+  typedFieldIds.add(typedFieldId);
+  ValueVectorReadExpression toHashFieldExp = new 
ValueVectorReadExpression(typedFieldId);
+  hashFieldExps.add(toHashFieldExp);
+}
+hash64 = hashHelper.getHash64(hashFieldExps.toArray(new 
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new 
TypedFieldId[typedFieldIds.size()]));
+  } catch (Exception e) {
+throw UserException.internalError(e).build(logger);
+  }
+}
+selectionVector2.allocateNew(recordCount);
+BitSet bitSet = new BitSet(recordCount);
+for (int i = 0; i < toFilterFields.size(); i++) {
+  BloomFilter bloomFilter = bloomFilters.get(i);
+  String fieldName = toFilterFields.get(i);
+  computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+}
+int svIndex = 0;
+int tmpFilterRows = 0;
+for (int i = 0; i < recordCount; i++) {
+  boolean contain = bitSet.get(i);
+  if (contain) {
+selectionVector2.setIndex(svIndex, i);
+svIndex++;
+  } else {
+tmpFilterRows++;
+  }
+}
+selectionVector2.setRecordCount(svIndex);
+if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+  recordCount = 0;
+  selectionVector2.clear();
+  logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+  return;
+}
+if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+  totalFilterRows = totalFilterRows + tmpFilterRows;
+  recordCount = svIndex;
+  BatchSchema batchSchema = this.schema;
+  VectorContainer backUpContainer = new 
VectorContainer(this.oContext.getAllocator(), batchSchema);
+  int fieldCount = batchSchema.getFieldCount();
+  for (int i = 0; i < fieldCount; i++) {
+ValueVector from = 
this.getContainer().getValueVector(i).getValueVector();
+ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+to.setInitialCapacity(svIndex);
+for (int r = 0; r < svIndex; r++) {
+  to.copyEntry(r, from, selectionVector2.getIndex(r));
+}
+  }
+  this.container.exchange(backUpContainer);
+  backUpContainer.clear();
+  selectionVector2.clear();
+  logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+  return;
+}
+  }
+
+
+  private void computeBitSet(int fieldId, BloomFilter bloomFilter, BitSet 
bitSet) throws SchemaChangeException {
+for (int rowIndex = 0; rowIndex < recordCount; rowIndex++) {
+  long hash = hash64.hash64Code(rowIndex, 0, fieldId);
+  boolean contain = bloomFilter.find(hash);
+  if (contain) {
+bitSet.set(rowIndex, true);
+bitSet.set(rowIndex);
 
 Review comment:
   Why call set() second time ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199336580
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -491,6 +508,50 @@ private void setupHashTable() throws 
SchemaChangeException {
 // Create the chained hash table
 baseHashTable =
   new ChainedHashTable(htConfig, context, allocator, buildBatch, 
probeBatch, null);
+if (enableRuntimeFilter) {
+  setupHash64(htConfig);
+}
+  }
+
+  private void setupHash64(HashTableConfig htConfig) throws 
SchemaChangeException {
+LogicalExpression[] keyExprsBuild = new 
LogicalExpression[htConfig.getKeyExprsBuild().size()];
+ErrorCollector collector = new ErrorCollectorImpl();
+int i = 0;
+for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+  final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, 
context.getFunctionRegistry());
+  if (collector.hasErrors()) {
+throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
+  }
+  if (expr == null) {
+continue;
+  }
+  keyExprsBuild[i] = expr;
+  i++;
+}
+i = 0;
+boolean meetNotExistField = false;
+TypedFieldId[] buildSideTypeFieldIds = new 
TypedFieldId[keyExprsBuild.length];
+for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+  SchemaPath schemaPath = (SchemaPath) ne.getExpr();
+  TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+  if (typedFieldId == null) {
+meetNotExistField = true;
+continue;
+  }
+  buildSideTypeFieldIds[i] = typedFieldId;
+  i++;
+}
+if (meetNotExistField) {
+  logger.info("as some build side key fileds not found, runtime filter was 
disabled");
 
 Review comment:
   mis-spelled 'fileds' => 'fields'. Pls use upper-case to begin the message: 
'As some build ...'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199338879
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.proto.BitData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RuntimeFilterWritable implements AutoCloseables.Closeable {
+
+  private BitData.RuntimeFilterBDef runtimeFilterBDef;
+
+  private DrillBuf[] data;
+
+  public RuntimeFilterWritable() {
+
+  }
+
+
+  public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
+return runtimeFilterBDef;
+  }
+
+  public void setRuntimeFilterBDef(BitData.RuntimeFilterBDef 
runtimeFilterBDef) {
+this.runtimeFilterBDef = runtimeFilterBDef;
+  }
+
+  public DrillBuf[] getData() {
+return data;
+  }
+
+  public void setData(DrillBuf... data) {
+this.data = data;
+  }
+
+
+  public List unwrap() {
+List sizeInBytes = 
runtimeFilterBDef.getBloomFilterSizeInBytesList();
+List bloomFilters = new ArrayList<>(sizeInBytes.size());
+for (int i = 0; i < sizeInBytes.size(); i++) {
+  DrillBuf byteBuf = data[i];
+  int offset = 0;
+  int size = sizeInBytes.get(i);
+  DrillBuf bloomFilterContent = byteBuf.slice(offset, size);
+  BloomFilter bloomFilter = new BloomFilter(bloomFilterContent);
+  bloomFilters.add(bloomFilter);
+}
+return bloomFilters;
+  }
+
+  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
+List thisFilters = this.unwrap();
+List otherFilters = runtimeFilterWritable.unwrap();
+for (int i = 0; i < thisFilters.size(); i++) {
+  BloomFilter thisOne = thisFilters.get(0);
 
 Review comment:
   Shouldn't this be get(i), not get(0) ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199336665
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * which is JPDD (join predicate push down) possible. The prerequisite to do 
JPDD
 
 Review comment:
   I think you mean 'JPPD', not JPDD.  Also change it to '...for which JPPD is 
possble'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199334349
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
 ##
 @@ -32,35 +33,53 @@
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.drill.exec.physical.base.AbstractJoinPop;
+import org.apache.drill.exec.work.filter.RuntimeFilterDef;
+
 
 @JsonTypeName("hash-join")
+@JsonIgnoreProperties(ignoreUnknown = true)
 
 Review comment:
   Instead of ignoring the property at global class level,  can you ignore 
specific one using `@JsonIgnore` ?  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199334575
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
 ##
 @@ -144,7 +144,7 @@ public HashTable createAndSetupHashTable(TypedFieldId[] 
outKeyFieldIds) throws C
 // Uncomment out this line to debug the generated code.
 // This code is called from generated code, so to step into this code,
 // persist the code generated in HashAggBatch also.
-// top.saveCodeForDebugging(true);
+//top.saveCodeForDebugging(true);
 
 Review comment:
   Pls keep original spacing (would be good if you maintain that code 
convention for comments). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199338885
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 ##
 @@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.proto.BitData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RuntimeFilterWritable implements AutoCloseables.Closeable {
+
+  private BitData.RuntimeFilterBDef runtimeFilterBDef;
+
+  private DrillBuf[] data;
+
+  public RuntimeFilterWritable() {
+
+  }
+
+
+  public BitData.RuntimeFilterBDef getRuntimeFilterBDef() {
+return runtimeFilterBDef;
+  }
+
+  public void setRuntimeFilterBDef(BitData.RuntimeFilterBDef 
runtimeFilterBDef) {
+this.runtimeFilterBDef = runtimeFilterBDef;
+  }
+
+  public DrillBuf[] getData() {
+return data;
+  }
+
+  public void setData(DrillBuf... data) {
+this.data = data;
+  }
+
+
+  public List unwrap() {
+List sizeInBytes = 
runtimeFilterBDef.getBloomFilterSizeInBytesList();
+List bloomFilters = new ArrayList<>(sizeInBytes.size());
+for (int i = 0; i < sizeInBytes.size(); i++) {
+  DrillBuf byteBuf = data[i];
+  int offset = 0;
+  int size = sizeInBytes.get(i);
+  DrillBuf bloomFilterContent = byteBuf.slice(offset, size);
+  BloomFilter bloomFilter = new BloomFilter(bloomFilterContent);
+  bloomFilters.add(bloomFilter);
+}
+return bloomFilters;
+  }
+
+  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
+List thisFilters = this.unwrap();
+List otherFilters = runtimeFilterWritable.unwrap();
+for (int i = 0; i < thisFilters.size(); i++) {
+  BloomFilter thisOne = thisFilters.get(0);
+  BloomFilter otherOne = otherFilters.get(0);
 
 Review comment:
   Same as above..


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199336779
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * which is JPDD (join predicate push down) possible. The prerequisite to do 
JPDD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocked operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+
+
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private QueryContext queryContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * Here we leverage the root Wrapper to do the traverse which indirectly
 
 Review comment:
   'traverse' => 'traversal'.  Also, the comment should be re-phrased to 
something like: 'This class maintains context for the run-time join pushdown's 
filter management.  It does a traversal of the physical operators by leveraging 
the root wrapper which indirectly holds the global PhysicalOperator tree and 
contains the minor fragment endpoints. '


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 

[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199337353
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -491,6 +508,50 @@ private void setupHashTable() throws 
SchemaChangeException {
 // Create the chained hash table
 baseHashTable =
   new ChainedHashTable(htConfig, context, allocator, buildBatch, 
probeBatch, null);
+if (enableRuntimeFilter) {
+  setupHash64(htConfig);
+}
+  }
+
+  private void setupHash64(HashTableConfig htConfig) throws 
SchemaChangeException {
+LogicalExpression[] keyExprsBuild = new 
LogicalExpression[htConfig.getKeyExprsBuild().size()];
+ErrorCollector collector = new ErrorCollectorImpl();
+int i = 0;
+for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+  final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, 
context.getFunctionRegistry());
+  if (collector.hasErrors()) {
+throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
+  }
+  if (expr == null) {
+continue;
+  }
+  keyExprsBuild[i] = expr;
+  i++;
+}
+i = 0;
+boolean meetNotExistField = false;
+TypedFieldId[] buildSideTypeFieldIds = new 
TypedFieldId[keyExprsBuild.length];
+for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+  SchemaPath schemaPath = (SchemaPath) ne.getExpr();
+  TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+  if (typedFieldId == null) {
+meetNotExistField = true;
+continue;
 
 Review comment:
   why not break from the loop if you encountered this condition since you are 
going to disable runtime filter and return if even 1 field was missing. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199329850
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/ValueVectorHashHelper.java
 ##
 @@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.expr.fn.impl;
+
+import com.sun.codemodel.JBlock;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JFieldRef;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.compile.sig.RuntimeOverridden;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.planner.physical.HashPrelUtil;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorAccessible;
+
+import javax.inject.Named;
+import java.io.IOException;
+
+public class ValueVectorHashHelper {
+
+  private RecordBatch recordBatch;
+
+  private FragmentContext context;
+
+  private TemplateClassDefinition TEMPLATE_DEFINITION = new 
TemplateClassDefinition(Hash64.class, Hash64Template.class);
+
+  private static final GeneratorMapping DO_SETUP_CONSTANT = 
GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method 
*/, null /* reset */, null /* cleanup */);
+
+  private static final GeneratorMapping GET_HASH_BUILD_INNERE = 
GeneratorMapping.create("doSetup" /* setup method */, "hash64Code" /* eval 
method */, null /* reset */, null /* cleanup */);
 
 Review comment:
   'INNER' instead of INNERE


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199336591
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -491,6 +508,50 @@ private void setupHashTable() throws 
SchemaChangeException {
 // Create the chained hash table
 baseHashTable =
   new ChainedHashTable(htConfig, context, allocator, buildBatch, 
probeBatch, null);
+if (enableRuntimeFilter) {
+  setupHash64(htConfig);
+}
+  }
+
+  private void setupHash64(HashTableConfig htConfig) throws 
SchemaChangeException {
+LogicalExpression[] keyExprsBuild = new 
LogicalExpression[htConfig.getKeyExprsBuild().size()];
+ErrorCollector collector = new ErrorCollectorImpl();
+int i = 0;
+for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+  final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, 
context.getFunctionRegistry());
+  if (collector.hasErrors()) {
+throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
+  }
+  if (expr == null) {
+continue;
+  }
+  keyExprsBuild[i] = expr;
+  i++;
+}
+i = 0;
+boolean meetNotExistField = false;
+TypedFieldId[] buildSideTypeFieldIds = new 
TypedFieldId[keyExprsBuild.length];
+for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+  SchemaPath schemaPath = (SchemaPath) ne.getExpr();
+  TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+  if (typedFieldId == null) {
+meetNotExistField = true;
+continue;
+  }
+  buildSideTypeFieldIds[i] = typedFieldId;
+  i++;
+}
+if (meetNotExistField) {
+  logger.info("as some build side key fileds not found, runtime filter was 
disabled");
+  enableRuntimeFilter = false;
+  return;
+}
+ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, 
context);
+try {
+  hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
+} catch (Exception e) {
+  throw new SchemaChangeException("fail to construct a field's hash64 
dynamic codes", e);
 
 Review comment:
   'fail' => 'failed'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199354240
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 ##
 @@ -226,6 +244,96 @@ public IterOutcome next() {
 }
   }
 
+  private void applyRuntimeFilter() throws SchemaChangeException {
+RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+if (runtimeFilterWritable == null) {
+  return;
+}
+if (recordCount <= 0) {
+  return;
+}
+List bloomFilters = runtimeFilterWritable.unwrap();
+if (hash64 == null) {
+  ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(this, 
context);
+  try {
+//generate hash helper
+this.toFilterFields = 
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+List hashFieldExps = new ArrayList<>();
+List typedFieldIds = new ArrayList<>();
+for (String toFilterField : toFilterFields) {
+  SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
+  TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
+  this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+  typedFieldIds.add(typedFieldId);
+  ValueVectorReadExpression toHashFieldExp = new 
ValueVectorReadExpression(typedFieldId);
+  hashFieldExps.add(toHashFieldExp);
+}
+hash64 = hashHelper.getHash64(hashFieldExps.toArray(new 
LogicalExpression[hashFieldExps.size()]), typedFieldIds.toArray(new 
TypedFieldId[typedFieldIds.size()]));
+  } catch (Exception e) {
+throw UserException.internalError(e).build(logger);
+  }
+}
+selectionVector2.allocateNew(recordCount);
+BitSet bitSet = new BitSet(recordCount);
+for (int i = 0; i < toFilterFields.size(); i++) {
+  BloomFilter bloomFilter = bloomFilters.get(i);
+  String fieldName = toFilterFields.get(i);
+  computeBitSet(field2id.get(fieldName), bloomFilter, bitSet);
+}
+int svIndex = 0;
+int tmpFilterRows = 0;
+for (int i = 0; i < recordCount; i++) {
+  boolean contain = bitSet.get(i);
+  if (contain) {
+selectionVector2.setIndex(svIndex, i);
+svIndex++;
+  } else {
+tmpFilterRows++;
+  }
+}
+selectionVector2.setRecordCount(svIndex);
+if (tmpFilterRows > 0 && tmpFilterRows == recordCount) {
+  recordCount = 0;
+  selectionVector2.clear();
+  logger.debug("filter {} rows by the RuntimeFilter", tmpFilterRows);
+  return;
+}
+if (tmpFilterRows > 0 && tmpFilterRows != recordCount ) {
+  totalFilterRows = totalFilterRows + tmpFilterRows;
+  recordCount = svIndex;
+  BatchSchema batchSchema = this.schema;
+  VectorContainer backUpContainer = new 
VectorContainer(this.oContext.getAllocator(), batchSchema);
+  int fieldCount = batchSchema.getFieldCount();
+  for (int i = 0; i < fieldCount; i++) {
+ValueVector from = 
this.getContainer().getValueVector(i).getValueVector();
+ValueVector to = backUpContainer.getValueVector(i).getValueVector();
+to.setInitialCapacity(svIndex);
+for (int r = 0; r < svIndex; r++) {
+  to.copyEntry(r, from, selectionVector2.getIndex(r));
 
 Review comment:
   To summarize the way you have implemented this part:  suppose the original 
ScanBatch contained 100 rows and 10 of them qualified the bloom filter, you 
create an SV2 of size 10, set the original qualifying row's index in the SV2, 
then for each ValueVector you copy the qualifying row's data into the 
backupContainer, followed by exchanging it with the current output container.   
A couple of thoughts about this: 
 - Why not produce the SV2 in the output batch and let the downstream 
operator handle it ? Quite often there may be a Filter operator above the Scan  
which would be applying other filters (i.e not the run-time filters) and it can 
combine its own SV2 with the SV2 produced by the Scan.  That way you avoid the 
extra copying and let Filter handle it (note that Filter does code-gen so it is 
more efficient way to handle bulk filtering). 
- There is clearly trade-offs with the extra copy approach : it depends on 
selectivity, i.e how many rows actually get eliminated by the run-time filter.  
I suppose you have mentioned this as a TODO depending on the NDV statistics ? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199336934
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * which is JPDD (join predicate push down) possible. The prerequisite to do 
JPDD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocked operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
+
+  private Map joinMjId2scanSize = new ConcurrentHashMap<>();
+
+
+  private Map joinMjId2ScanMjId = new HashMap<>();
+
+  private RuntimeFilterWritable aggregatedRuntimeFilter;
+
+  private DrillbitContext drillbitContext;
+
+  private QueryContext queryContext;
+
+  private SendingAccountor sendingAccountor = new SendingAccountor();
+
+  private String lineSeparator;
+
+
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+
+  /**
+   * Here we leverage the root Wrapper to do the traverse which indirectly
+   * holds the whole global PhysicalOperator tree but also contains the 
endpoints
+   * of all the MinorFragments.
+   *
+   * @param workUnit
+   * @param queryContext
+   */
+  public RuntimeFilterManager(QueryWorkUnit workUnit, QueryContext 
queryContext, DrillbitContext drillbitContext) {
+this.rootWrapper = workUnit.getRootWrapper();
+this.queryContext = queryContext;
+this.drillbitContext = drillbitContext;
+lineSeparator = java.security.AccessController.doPrivileged(new 
sun.security.action.GetPropertyAction("line.separator"));
+  }
+
+  /**
+   * Apply runtime 

[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199354604
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -696,6 +780,18 @@ public void executeBuildPhase() throws 
SchemaChangeException {
 if ( cycleNum > 0 ) {
   read_right_HV_vector = (IntVector) 
buildBatch.getContainer().getLast();
 }
+//create runtime filter
+if (cycleNum == 0 && enableRuntimeFilter) {
+  //create runtime filter and send out async
+  int condFieldIndex = 0;
+  for (BloomFilter bloomFilter : bloomFilters) {
+for (int ind = 0; ind < currentRecordCount; ind++) {
+  long hashCode = hash64.hash64Code(ind, 0, condFieldIndex);
+  bloomFilter.insert(hashCode);
 
 Review comment:
   Currently, the hash join relies on memory calculations (both build and probe 
sides) to do accounting for spilling purposes.  The bloom filter memory use 
should also be included in that calculation, although it would fine if you 
create an enhancement JIRA and address it separately. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199336769
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * which is JPDD (join predicate push down) possible. The prerequisite to do 
JPDD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocked operator like HashAgg
+ */
+public class RuntimeFilterManager {
+
+  private Wrapper rootWrapper;
+
+  private Map> 
joinMjId2probdeScanEps = new HashMap<>();
 
 Review comment:
   Pls add a brief comment about what this data structure is intended for.  
Also for the Maps below.  


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199334413
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashJoinPOP.java
 ##
 @@ -86,8 +105,17 @@ public void setMaxAllocation(long maxAllocation) {
*/
   @Override
   public boolean isBufferedOperator(QueryContext queryContext) {
-// In case forced to use a single partition - do not consider this a 
buffered op (when memory is divided)
-return queryContext == null ||
-  1 < 
(int)queryContext.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR)
 ;
+  // In case forced to use a single partition - do not consider this a 
buffered op (when memory is divided)
+  return queryContext==null||
 
 Review comment:
   pls keep the original formatting (indentation and spaces etc.). 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199336697
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 ##
 @@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.physical.PhysicalPlan;
+
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.Store;
+import org.apache.drill.exec.physical.config.BroadcastExchange;
+import org.apache.drill.exec.physical.config.HashAggregate;
+import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.parquet.ParquetRGFilterEvaluator;
+import org.apache.drill.exec.util.Pointer;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This class traverses the physical operator tree to find the HashJoin 
operator
+ * which is JPDD (join predicate push down) possible. The prerequisite to do 
JPDD
+ * is:
+ * 1. The join condition is equality
+ * 2. The physical join node is a HashJoin one
+ * 3. The probe side children of the HashJoin node should not contain a 
blocked operator like HashAgg
 
 Review comment:
   'blocked operator' => 'blocking operator'


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] amansinha100 commented on a change in pull request #1334: DRILL-6385: Support JPPD feature

2018-07-01 Thread GitBox
amansinha100 commented on a change in pull request #1334: DRILL-6385: Support 
JPPD feature
URL: https://github.com/apache/drill/pull/1334#discussion_r199337337
 
 

 ##
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 ##
 @@ -491,6 +508,50 @@ private void setupHashTable() throws 
SchemaChangeException {
 // Create the chained hash table
 baseHashTable =
   new ChainedHashTable(htConfig, context, allocator, buildBatch, 
probeBatch, null);
+if (enableRuntimeFilter) {
+  setupHash64(htConfig);
+}
+  }
+
+  private void setupHash64(HashTableConfig htConfig) throws 
SchemaChangeException {
+LogicalExpression[] keyExprsBuild = new 
LogicalExpression[htConfig.getKeyExprsBuild().size()];
+ErrorCollector collector = new ErrorCollectorImpl();
+int i = 0;
+for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
+  final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(ne.getExpr(), buildBatch, collector, 
context.getFunctionRegistry());
+  if (collector.hasErrors()) {
+throw new SchemaChangeException("Failure while materializing 
expression. " + collector.toErrorString());
+  }
+  if (expr == null) {
+continue;
+  }
+  keyExprsBuild[i] = expr;
+  i++;
+}
+i = 0;
+boolean meetNotExistField = false;
 
 Review comment:
   Better name would be  'missingField' .


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ppadma closed pull request #1342: DRILL-6537:Limit the batch size for buffering operators based on how …

2018-07-01 Thread GitBox
ppadma closed pull request #1342: DRILL-6537:Limit the batch size for buffering 
operators based on how …
URL: https://github.com/apache/drill/pull/1342
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):



 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] sohami opened a new pull request #1356: DRILL-6561: Lateral excluding the columns from output container provided by projection push into rules

2018-07-01 Thread GitBox
sohami opened a new pull request #1356: DRILL-6561: Lateral excluding the 
columns from output container provided by projection push into rules
URL: https://github.com/apache/drill/pull/1356
 
 
   This PR is dependent upon DRILL-6545 and has 2 commits from it for 
compilation. Once that is merged in this PR has to be rebased on master and 
then checked-in.
   
   @parthchandra - Please review it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] vvysotskyi commented on issue #1342: DRILL-6537:Limit the batch size for buffering operators based on how …

2018-07-01 Thread GitBox
vvysotskyi commented on issue #1342: DRILL-6537:Limit the batch size for 
buffering operators based on how …
URL: https://github.com/apache/drill/pull/1342#issuecomment-401614243
 
 
   @ppadma, could you please resolve merge conflicts?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] arina-ielchiieva commented on a change in pull request #1331: DRILL-6519: Add String Distance and Phonetic Functions

2018-07-01 Thread GitBox
arina-ielchiieva commented on a change in pull request #1331: DRILL-6519: Add 
String Distance and Phonetic Functions
URL: https://github.com/apache/drill/pull/1331#discussion_r199343698
 
 

 ##
 File path: 
exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestPhoneticFunctions.java
 ##
 @@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.fn.impl;
+
+import org.apache.drill.categories.SqlFunctionTest;
+import org.apache.drill.categories.UnlikelyTest;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryResultSet;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({UnlikelyTest.class, SqlFunctionTest.class})
+public class TestPhoneticFunctions extends ClusterTest {
+
+  private QueryResultSet result;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
+startCluster(builder);
+  }
+
+  @Test
+  public void testSoundex() throws Exception {
+String result = queryBuilder()
+.sql("select soundex('jaime') as soundex from (values(1))")
+.singletonString();
+assertEquals("J500", result);
+  }
+
+  @Test
+  public void testCaverphone1() throws Exception {
+String result = queryBuilder()
+.sql("SELECT caverphone1('jaime') as caverphone FROM (VALUES(1))")
+.singletonString();
+assertEquals("YM", result);
+  }
+
+  @Test
+  public void testCaverphone2() throws Exception {
+String result = queryBuilder()
+.sql("SELECT caverphone2('steve') as caverphone FROM (VALUES(1))")
+.singletonString();
+assertEquals("STF111", result);
+  }
+
+  @Test
+  public void testCologne() throws Exception {
+String result = queryBuilder()
+.sql("SELECT cologne_phonetic('steve') AS CP FROM (VALUES(1))")
+.singletonString();
+assertEquals("823", result);
+  }
+
+  @Test
+  public void testMatchRatingEncoder() throws Exception {
+String result = queryBuilder()
+.sql("SELECT match_rating_encoder('Boston') AS MR FROM (VALUES(1))")
+.singletonString();
+assertEquals("BSTN", result);
+  }
+
+  @Test
+  public void testNYSIIS() throws Exception {
+String result = queryBuilder()
+.sql("SELECT nysiis('Boston') AS ny FROM (VALUES(1))")
+.singletonString();
+assertEquals("BASTAN", result);
+  }
+
+  @Test
+  public void testRefinedSoundex() throws Exception {
+String result = queryBuilder()
+.sql("SELECT refined_soundex('Boston') AS rs FROM (VALUES(1))")
+.singletonString();
+assertEquals("B103608", result);
+  }
+
+  @Test
+  public void testSoundsLike() throws Exception {
+ int result = queryBuilder()
 
 Review comment:
   Sure, please squash.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services