[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-22 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443954743



##
File path: contrib/storage-druid/README.md
##
@@ -28,4 +31,28 @@ Following is the default registration configuration.
 
 ### Developer
 
-Building - `mvn install -pl contrib/storage-druid`
+* Building the plugin 
+
+`mvn install -pl contrib/storage-druid`
+
+* Building DRILL

Review comment:
   I updated the read me to reflect that it is part of Drill.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-22 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443497042



##
File path: contrib/storage-druid/README.md
##
@@ -28,4 +31,28 @@ Following is the default registration configuration.
 
 ### Developer
 
-Building - `mvn install -pl contrib/storage-druid`
+* Building the plugin 
+
+`mvn install -pl contrib/storage-druid`
+
+* Building DRILL

Review comment:
   I did this to keep notes for myself :) i can remove them





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-22 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443496846



##
File path: 
contrib/storage-druid/src/test/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfigTest.java
##
@@ -31,7 +32,7 @@
 public class DruidStoragePluginConfigTest {
 
   @Test
-  public void testDruidStoragePluginConfigSuccessfullyParsed()
+  public void test_druid_storage_plugin_config_successfully_parsed()

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-22 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443496786



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.PlanStringBuilder;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  private final DruidScanSpec scanSpec;
+  private final DruidStoragePlugin storagePlugin;
+
+  private List columns;
+  private boolean filterPushedDown = false;
+  private List druidWorkList = new ArrayList<>();
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public DruidGroupScan(@JsonProperty("userName") String userName,
+@JsonProperty("scanSpec") DruidScanSpec scanSpec,
+@JsonProperty("storagePluginConfig") 
DruidStoragePluginConfig storagePluginConfig,
+@JsonProperty("columns") List columns,
+@JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName,
+pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
+scanSpec,
+columns);
+  }
+
+  public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, 
DruidScanSpec scanSpec,
+List columns) {
+super(userName);
+this.storagePlugin = storagePlugin;
+this.scanSpec = scanSpec;
+this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
+init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The DruidGroupScan to clone
+   */
+  private DruidGroupScan(DruidGroupScan that) {
+super(that);
+this.columns = that.columns;
+this.scanSpec = that.scanSpec;
+this.storagePlugin = that.storagePlugin;
+this.filterPushedDown = that.filterPushedDown;
+this.druidWorkList = that.druidWorkList;
+this.assignments = that.assignments;
+  }
+
+  @Override
+  public GroupScan clone(List columns) {
+DruidGroupScan newScan = new DruidGroupScan(this);
+newScan.columns = columns;
+return newScan;
+  }
+
+  @Override
+  public List getOperatorAffinity() {
+if (affinities == null) {
+  affinities = AffinityCreator.getAffinityMap(druidWorkList);
+}
+return affinities;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List columns) {
+return true;
+  }
+
+  @JsonIgnore
+  public boolean isFilterPushedDown() {
+return filterPushedDown;
+  }
+
+  @JsonIgnore
+  public void 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-22 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443496278



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.store.druid.druid.SelectQueryBuilder;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidRecordReader.class);
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  private final DruidStoragePlugin plugin;
+  private final DruidSubScan.DruidSubScanSpec scanSpec;
+  private final List dimensions;
+  private final DruidFilter filter;
+  private ArrayList pagingIdentifiers = new ArrayList<>();
+  private int maxRecordsToRead = -1;
+
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  private final FragmentContext fragmentContext;
+
+  public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec,
+   List projectedColumns,
+   int maxRecordsToRead,
+   FragmentContext context,
+   DruidStoragePlugin plugin) {
+dimensions = new ArrayList<>();
+setColumns(projectedColumns);
+this.maxRecordsToRead = maxRecordsToRead;
+this.plugin = plugin;
+scanSpec = subScanSpec;
+fragmentContext = context;
+this.filter = subScanSpec.getFilter();
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (isStarQuery()) {
+  transformed.add(SchemaPath.STAR_COLUMN);
+} else {
+  for (SchemaPath column : projectedColumns) {
+String fieldName = column.getRootSegment().getPath();
+transformed.add(column);
+this.dimensions.add(fieldName);
+  }
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) {
+this.writer = new VectorContainerWriter(output);
+
+this.jsonReader =
+  new JsonReader.Builder(fragmentContext.getManagedBuffer())
+.schemaPathColumns(ImmutableList.copyOf(getColumns()))
+.skipOuterList(true)
+.build();
+  }
+
+  @Override
+  public int next() {
+writer.allocate();
+writer.reset();
+DruidQueryClient druidQueryClient = plugin.getDruidQueryClient();
+Stopwatch watch = Stopwatch.createStarted();
+try {
+  

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-22 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443496091



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.store.druid.druid.SelectQueryBuilder;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidRecordReader.class);
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  private final DruidStoragePlugin plugin;
+  private final DruidSubScan.DruidSubScanSpec scanSpec;
+  private final List dimensions;
+  private final DruidFilter filter;
+  private ArrayList pagingIdentifiers = new ArrayList<>();
+  private int maxRecordsToRead = -1;
+
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  private final FragmentContext fragmentContext;
+
+  public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec,
+   List projectedColumns,
+   int maxRecordsToRead,
+   FragmentContext context,
+   DruidStoragePlugin plugin) {
+dimensions = new ArrayList<>();
+setColumns(projectedColumns);
+this.maxRecordsToRead = maxRecordsToRead;
+this.plugin = plugin;
+scanSpec = subScanSpec;
+fragmentContext = context;
+this.filter = subScanSpec.getFilter();
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (isStarQuery()) {
+  transformed.add(SchemaPath.STAR_COLUMN);
+} else {
+  for (SchemaPath column : projectedColumns) {
+String fieldName = column.getRootSegment().getPath();
+transformed.add(column);
+this.dimensions.add(fieldName);
+  }
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) {
+this.writer = new VectorContainerWriter(output);
+
+this.jsonReader =
+  new JsonReader.Builder(fragmentContext.getManagedBuffer())
+.schemaPathColumns(ImmutableList.copyOf(getColumns()))
+.skipOuterList(true)
+.build();
+  }
+
+  @Override
+  public int next() {
+writer.allocate();
+writer.reset();
+DruidQueryClient druidQueryClient = plugin.getDruidQueryClient();

Review comment:
   Fixed.





[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-21 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443268109



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/RestClientWrapper.java
##
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+
+import javax.ws.rs.core.HttpHeaders;
+import java.io.IOException;
+
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+
+public class RestClientWrapper implements RestClient {
+  private static final HttpClient httpClient = new DefaultHttpClient();

Review comment:
   We can do this in another PR. Moving to okhttp3 would be good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-21 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443268025



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+  String functionName,
+  SchemaPath field,
+  Object fieldValue) throws IOException {
+// extract the field name
+
+String fieldName = field.getAsNamePart().getName(); 
//.getAsUnescapedPath();
+String filter;
+
+logger.debug("createDruidScanSpec called. FunctionName - "
+  + functionName + ", field - " + fieldName + ", fieldValue - " + 
fieldValue);
+
+switch (functionName) {

Review comment:
   Fixed by adding a dedicated function for translating drill function into 
druid filter





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-21 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443267977



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidRecordReader.class);
+private DruidStoragePlugin plugin;
+private final DruidSubScan.DruidSubScanSpec scanSpec;
+private List dimensions;
+private String filters;
+private ArrayList pagingIdentifiers = new ArrayList<>();
+
+private JsonReader jsonReader;
+private VectorContainerWriter writer;
+
+private OutputMutator output;
+private OperatorContext context;
+private final FragmentContext fragmentContext;
+
+private ObjectMapper objectMapper = new ObjectMapper();
+
+public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec, 
List projectedColumns,
+ FragmentContext context, DruidStoragePlugin 
plugin) {
+dimensions = new ArrayList();
+setColumns(projectedColumns);
+this.plugin = plugin;
+scanSpec = subScanSpec;
+fragmentContext = context;
+this.filters = subScanSpec.getFilter();
+}
+
+@Override
+protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (isStarQuery()) {
+transformed.add(SchemaPath.STAR_COLUMN);
+} else {
+for (SchemaPath column : projectedColumns) {
+String fieldName = column.getRootSegment().getPath();
+transformed.add(column);
+this.dimensions.add(fieldName);
+}
+}
+return transformed;
+}
+
+@Override
+public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.context = context;
+this.output = output;
+this.writer = new VectorContainerWriter(output);
+
+//Lists.newArrayList(getColumns()), true, false, false
+this.jsonReader =
+new JsonReader.Builder(fragmentContext.getManagedBuffer())
+.schemaPathColumns(ImmutableList.copyOf(getColumns()))
+.skipOuterList(true)
+.build();
+logger.debug(" Initialized JsonRecordReader. ");
+}
+
+@Override
+public int next() {

Review comment:
   Fixed this by pulling adding a `maxRecordsToRead` property that can be 
configured for the plugin. Defaults to 
`BaseValueVector.INITIAL_VALUE_ALLOCATION`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-06-21 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r443267848



##
File path: contrib/storage-druid/README.md
##
@@ -0,0 +1,24 @@
+# Drill Apache Druid Plugin
+
+Drill druid storage plugin allows you to perform SQL queries against Druid 
datasource(s).

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-26 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r430105918



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+  String functionName,
+  SchemaPath field,
+  Object fieldValue) throws IOException {
+// extract the field name
+
+String fieldName = field.getAsNamePart().getName(); 
//.getAsUnescapedPath();
+String filter;
+
+logger.debug("createDruidScanSpec called. FunctionName - "
+  + functionName + ", field - " + fieldName + ", fieldValue - " + 
fieldValue);
+
+switch (functionName) {
+  case "equal":
+  {
+if (fieldName.equalsIgnoreCase(SelectQuery.IntervalDimensionName)) {
+  DruidIntervalFilter druidIntervalFilter = new 
DruidIntervalFilter((String)fieldValue);
+  filter = druidIntervalFilter.toJson();
+  break;
+} else {
+  DruidSelectorFilter druidSelectorFilter = new 
DruidSelectorFilter(fieldName, (String) fieldValue);
+  filter = druidSelectorFilter.toJson();
+  break;
+}
+  }
+  case "not_equal":
+  {
+DruidSelectorFilter druidSelectorFilter = new 
DruidSelectorFilter(fieldName, String.valueOf(fieldValue));
+String selectorFilter = druidSelectorFilter.toJson();
+DruidNotFilter druidNotFilter = new DruidNotFilter(selectorFilter);
+filter = druidNotFilter.toJson();
+break;
+  }
+  case "greater_than_or_equal_to":
+  {
+DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, 
String.valueOf(fieldValue), null);
+filter = druidBoundFilter.toJson();
+break;
+  }
+  case "greater_than":
+  {
+DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, 
String.valueOf(fieldValue), null);
+druidBoundFilter.setLowerStrict(true);
+filter = druidBoundFilter.toJson();
+break;
+  }
+  case "less_than_or_equal_to":
+  {
+DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, 
null, String.valueOf(fieldValue));
+filter = druidBoundFilter.toJson();
+break;
+  }
+  case "less_than":
+  {
+DruidBoundFilter druidBoundFilter = new DruidBoundFilter(fieldName, 
null, String.valueOf(fieldValue));
+druidBoundFilter.setUpperStrict(true);
+filter = druidBoundFilter.toJson();
+break;
+  }
+  case "isnull":
+  case "isNull":
+  case "is null":
+  {
+DruidSelectorFilter druidSelectorFilter = new 
DruidSelectorFilter(fieldName, null);
+filter = druidSelectorFilter.toJson();
+break;
+  }
+  case "isnotnull":
+  case "isNotNull":
+  case "is not null":
+  {
+DruidSelectorFilter druidSelectorFilter = new 
DruidSelectorFilter(fieldName, null);
+String selectorFilter = druidSelectorFilter.toJson();
+DruidNotFilter druidNotFilter = new DruidNotFilter(selectorFilter);
+filter = druidNotFilter.toJson();
+break;
+  }
+  case "like":
+  {
+String val = 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-25 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r429915738



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQuery.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+
+public class SelectQuery {
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  public static String IntervalDimensionName = "eventInterval";
+  private static final String ISO8601DateStringFormat = 
"-MM-dd'T'HH:mm:ss.SSSZ";
+  private static final String NixStartTime = "1970-01-01T00:00:00.000Z";
+  private String queryType = "select";
+  private String dataSource;
+  private boolean descending = false;
+  private ArrayList dimensions = new ArrayList<>();
+  private ArrayList metrics = new ArrayList<>();
+  private String granularity = "all";
+  private List intervals = new ArrayList<>();
+  private PagingSpec pagingSpec = new PagingSpec(null);
+  private String filter;
+
+  public SelectQuery(String dataSource, List intervals) {
+this.dataSource = dataSource;
+this.intervals = intervals;
+  }
+
+  public SelectQuery(String dataSource) {
+this.dataSource = dataSource;
+
+//Note - Interval is always this by default because there is no way to 
provide an interval via SQL
+DateTime now = new DateTime();
+DateTime zulu = now.toDateTime( DateTimeZone.UTC );
+String interval = NixStartTime + "/" + zulu;
+this.intervals.add(interval);
+  }
+
+  public String getQueryType() {
+return queryType;
+  }
+
+  public void setQueryType(String queryType) {
+this.queryType = queryType;
+  }
+
+  public String getDataSource() {
+return dataSource;
+  }
+
+  public void setDataSource(String dataSource) {
+this.dataSource = dataSource;
+  }
+
+  public boolean isDescending() {
+return descending;
+  }
+
+  public void setDescending(boolean descending) {
+this.descending = descending;
+  }
+
+  public ArrayList getDimensions() {
+return dimensions;
+  }
+
+  public void setDimensions(List dimensions) {
+this.dimensions = (ArrayList) dimensions;
+  }
+
+  public ArrayList getMetrics() {
+return metrics;
+  }
+
+  public void setMetrics(ArrayList metrics) {
+this.metrics = metrics;
+  }
+
+  public String getGranularity() {
+return granularity;
+  }
+
+  public void setGranularity(String granularity) {
+this.granularity = granularity;
+  }
+
+  public List getIntervals() {
+return intervals;
+  }
+
+  public void setIntervals(ArrayList intervals) {
+this.intervals = intervals;
+  }
+
+  public PagingSpec getPagingSpec() {
+return pagingSpec;
+  }
+
+  public void setPagingSpec(PagingSpec pagingSpec) {
+this.pagingSpec = pagingSpec;
+  }
+
+  public String getFilter() {
+return filter;
+  }
+
+  public void setFilter(String filter) {
+this.filter = filter;
+  }
+
+  public String toJson() throws IOException {

Review comment:
   You are correct. I was lazy when I initially did this. Trying to fix it 
as we go along. Just fixed this file.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-18 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426987466



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidRecordReader.class);
+private DruidStoragePlugin plugin;
+private final DruidSubScan.DruidSubScanSpec scanSpec;
+private List dimensions;
+private String filters;
+private ArrayList pagingIdentifiers = new ArrayList<>();
+
+private JsonReader jsonReader;
+private VectorContainerWriter writer;
+
+private OutputMutator output;
+private OperatorContext context;
+private final FragmentContext fragmentContext;
+
+private ObjectMapper objectMapper = new ObjectMapper();
+
+public DruidRecordReader(DruidSubScan.DruidSubScanSpec subScanSpec, 
List projectedColumns,
+ FragmentContext context, DruidStoragePlugin 
plugin) {
+dimensions = new ArrayList();
+setColumns(projectedColumns);
+this.plugin = plugin;
+scanSpec = subScanSpec;
+fragmentContext = context;
+this.filters = subScanSpec.getFilter();
+}
+
+@Override
+protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (isStarQuery()) {
+transformed.add(SchemaPath.STAR_COLUMN);
+} else {
+for (SchemaPath column : projectedColumns) {
+String fieldName = column.getRootSegment().getPath();
+transformed.add(column);
+this.dimensions.add(fieldName);
+}
+}
+return transformed;
+}
+
+@Override
+public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.context = context;
+this.output = output;
+this.writer = new VectorContainerWriter(output);
+
+//Lists.newArrayList(getColumns()), true, false, false
+this.jsonReader =
+new JsonReader.Builder(fragmentContext.getManagedBuffer())
+.schemaPathColumns(ImmutableList.copyOf(getColumns()))
+.skipOuterList(true)
+.build();
+logger.debug(" Initialized JsonRecordReader. ");
+}
+
+@Override
+public int next() {
+
+writer.allocate();
+writer.reset();
+SelectQuery selectQuery = new SelectQuery(scanSpec.dataSourceName);
+selectQuery.setDimensions(this.dimensions);
+selectQuery.setFilter(this.filters);
+
+ObjectNode paging = objectMapper.createObjectNode();
+if (this.pagingIdentifiers != null && 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-18 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426615797



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingIdentifier.java
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+public class PagingIdentifier {
+
+  private String _segmentName;

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-18 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426605366



##
File path: distribution/src/main/resources/storage-plugins-override-example.conf
##
@@ -66,3 +66,11 @@
 enabled: true
   }
 }
+"storage": {
+  druid: {
+type : "druid",
+brokerAddress : "http://localhost:8082;,
+coordinatorAddress: "http://localhost:8081;,
+enabled : true

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-18 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426600487



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
##
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.druid.rest.DruidAdminClient;
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.exec.store.druid.rest.RestClient;
+import org.apache.drill.exec.store.druid.rest.RestClientWrapper;
+import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+
+public class DruidStoragePlugin extends AbstractStoragePlugin {
+
+  static final Logger logger = 
LoggerFactory.getLogger(DruidStoragePlugin.class);
+
+  private final DrillbitContext context;
+  private final DruidStoragePluginConfig pluginConfig;
+  private final DruidAdminClient druidAdminClient;
+  private final DruidQueryClient druidQueryClient;
+  private final DruidSchemaFactory schemaFactory;
+
+  public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, 
DrillbitContext context, String name) {
+super(context, name);
+this.pluginConfig = pluginConfig;
+this.context = context;
+RestClient restClient = new RestClientWrapper();
+this.druidAdminClient = new 
DruidAdminClient(pluginConfig.GetCoordinatorURI(), restClient);
+this.druidQueryClient = new DruidQueryClient(pluginConfig.GetBrokerURI(), 
restClient);
+this.schemaFactory = new DruidSchemaFactory(this, name);
+  }
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection) throws IOException {

Review comment:
   Probably just return the base type because of the base class. Ok to 
return `DruidGroupScan `





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-18 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426599812



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidSubScan.class);
+
+  @JsonProperty
+  public final DruidStoragePluginConfig druidStoragePluginConfig;
+
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+
+  private final List dataSourceScanSpecList;
+  private final List columns;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+  @JsonProperty("userName") String userName,
+  @JsonProperty("druidStoragePluginConfig") 
StoragePluginConfig druidStoragePluginConfig,
+  @JsonProperty("datasourceScanSpecList") 
LinkedList datasourceScanSpecList,
+  @JsonProperty("columns") List columns) 
throws ExecutionSetupException {
+super(userName);
+druidStoragePlugin = (DruidStoragePlugin) 
registry.getPlugin(druidStoragePluginConfig);
+this.dataSourceScanSpecList = datasourceScanSpecList;
+this.druidStoragePluginConfig = (DruidStoragePluginConfig) 
druidStoragePluginConfig;
+this.columns = columns;
+  }
+
+  public DruidSubScan(String userName, DruidStoragePlugin plugin, 
DruidStoragePluginConfig config,
+  List dataSourceInfoList, 
List columns) {
+super(userName);
+druidStoragePlugin = plugin;
+druidStoragePluginConfig = config;
+this.dataSourceScanSpecList = dataSourceInfoList;
+this.columns = columns;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  public List getDataSourceScanSpecList() {
+return dataSourceScanSpecList;
+  }
+
+  @JsonIgnore
+  public DruidStoragePluginConfig getStorageConfig() {
+return druidStoragePluginConfig;
+  }
+
+  public List getColumns() {
+return columns;
+  }
+
+  @Override
+  public boolean isExecutable() {
+return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List children) 
throws ExecutionSetupException {

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426312198



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/SelectQuery.java
##
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.StringUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Objects;
+
+public class SelectQuery {
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  public static String IntervalDimensionName = "eventInterval";

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426311980



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  // assuming a 500 MB segment size with 5 millions rows per segment
+  private static final int DEFAULT_ROW_SIZE = 100;
+  private final DruidScanSpec scanSpec;
+  private final DruidStoragePlugin storagePlugin;
+
+  private List columns;
+  private boolean filterPushedDown = false;
+  private List druidWorkList = new ArrayList<>();
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public DruidGroupScan(@JsonProperty("userName") String userName,
+@JsonProperty("scanSpec") DruidScanSpec scanSpec,
+@JsonProperty("storagePluginConfig") 
DruidStoragePluginConfig storagePluginConfig,
+@JsonProperty("columns") List columns,
+@JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName,
+pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
+scanSpec,
+columns);
+  }
+
+  public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, 
DruidScanSpec scanSpec,
+List columns) {
+super(userName);
+this.storagePlugin = storagePlugin;
+this.scanSpec = scanSpec;
+this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
+init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The DruidGroupScan to clone
+   */
+  private DruidGroupScan(DruidGroupScan that) {
+super(that);
+this.columns = that.columns;
+this.scanSpec = that.scanSpec;
+this.storagePlugin = that.storagePlugin;
+this.filterPushedDown = that.filterPushedDown;
+this.druidWorkList = that.druidWorkList;
+this.assignments = that.assignments;
+  }
+
+  @Override
+  public GroupScan clone(List columns) {
+DruidGroupScan newScan = new DruidGroupScan(this);
+newScan.columns = columns;
+return newScan;
+  }
+
+  @Override
+  public List getOperatorAffinity() {
+if (affinities == null) {
+  affinities = AffinityCreator.getAffinityMap(druidWorkList);
+}
+return affinities;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List columns) {
+return true;
+  }
+
+  @JsonIgnore
+  public boolean 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426312021



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  // assuming a 500 MB segment size with 5 millions rows per segment
+  private static final int DEFAULT_ROW_SIZE = 100;
+  private final DruidScanSpec scanSpec;
+  private final DruidStoragePlugin storagePlugin;
+
+  private List columns;
+  private boolean filterPushedDown = false;
+  private List druidWorkList = new ArrayList<>();
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public DruidGroupScan(@JsonProperty("userName") String userName,
+@JsonProperty("scanSpec") DruidScanSpec scanSpec,
+@JsonProperty("storagePluginConfig") 
DruidStoragePluginConfig storagePluginConfig,
+@JsonProperty("columns") List columns,
+@JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName,
+pluginRegistry.resolve(storagePluginConfig, DruidStoragePlugin.class),
+scanSpec,
+columns);
+  }
+
+  public DruidGroupScan(String userName, DruidStoragePlugin storagePlugin, 
DruidScanSpec scanSpec,
+List columns) {
+super(userName);
+this.storagePlugin = storagePlugin;
+this.scanSpec = scanSpec;
+this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
+init();
+  }
+
+  /**
+   * Private constructor, used for cloning.
+   * @param that The DruidGroupScan to clone
+   */
+  private DruidGroupScan(DruidGroupScan that) {
+super(that);
+this.columns = that.columns;
+this.scanSpec = that.scanSpec;
+this.storagePlugin = that.storagePlugin;
+this.filterPushedDown = that.filterPushedDown;
+this.druidWorkList = that.druidWorkList;
+this.assignments = that.assignments;
+  }
+
+  @Override
+  public GroupScan clone(List columns) {
+DruidGroupScan newScan = new DruidGroupScan(this);
+newScan.columns = columns;
+return newScan;
+  }
+
+  @Override
+  public List getOperatorAffinity() {
+if (affinities == null) {
+  affinities = AffinityCreator.getAffinityMap(druidWorkList);
+}
+return affinities;
+  }
+
+  @Override
+  public boolean canPushdownProjects(List columns) {
+return true;
+  }
+
+  @JsonIgnore
+  public boolean 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426311913



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidScanBatchCreator implements BatchCreator {
+
+  static final Logger logger = 
LoggerFactory.getLogger(DruidScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(ExecutorFragmentContext context, 
DruidSubScan subScan, List children) throws 
ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+List readers = Lists.newArrayList();
+List columns;
+
+for (DruidSubScan.DruidSubScanSpec scanSpec : subScan.getScanSpec()) {
+  try {
+columns = subScan.getColumns();
+readers.add(new DruidRecordReader(scanSpec, columns, context, 
subScan.getStorageEngine()));
+  } catch (Exception e1) {
+throw new ExecutionSetupException(e1);
+  }
+}
+logger.debug("Number of record readers initialized : " + readers.size());

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426311940



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidGroupScan.class);
+  private static final long DEFAULT_TABLET_SIZE = 1000;
+  // assuming a 500 MB segment size with 5 millions rows per segment
+  private static final int DEFAULT_ROW_SIZE = 100;

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426311795



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/druid/PagingIdentifier.java
##
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.druid;
+
+public class PagingIdentifier {
+
+  private String _segmentName;

Review comment:
   fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426311818



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final Logger logger = LoggerFactory.getLogger(DruidSubScan.class);
+
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+
+  private final List scanSpec;
+  private final List columns;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+  @JsonProperty("userName") String userName,
+  @JsonProperty("config") StoragePluginConfig config,
+  @JsonProperty("scanSpec") LinkedList 
datasourceScanSpecList,
+  @JsonProperty("columns") List columns) {
+super(userName);
+druidStoragePlugin = registry.resolve(config, DruidStoragePlugin.class);
+this.scanSpec = datasourceScanSpecList;
+this.columns = columns;
+  }
+
+  public DruidSubScan(String userName, DruidStoragePlugin plugin,
+  List dataSourceInfoList, 
List columns) {
+super(userName);
+this.druidStoragePlugin = plugin;
+this.scanSpec = dataSourceInfoList;
+this.columns = columns;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @JsonIgnore
+  public List getScanSpec() {
+return scanSpec;
+  }
+
+  public List getColumns() {
+return columns;
+  }
+
+  @JsonIgnore
+  @Override
+  public boolean isExecutable() {
+return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List children) {
+Preconditions.checkArgument(children.isEmpty());
+return new DruidSubScan(getUserName(), druidStoragePlugin, scanSpec, 
columns);
+  }
+
+  @JsonIgnore
+  @Override
+  public int getOperatorType() {
+return CoreOperatorType.DRUID_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator iterator() {
+return emptyIterator();
+  }
+
+  public static class DruidSubScanSpec {
+
+protected final String dataSourceName;
+protected final DruidFilter filter;
+
+@JsonCreator
+public DruidSubScanSpec(@JsonProperty("dataSourceName") String 
dataSourceName,
+@JsonProperty("filter") DruidFilter filter) {
+  this.dataSourceName = dataSourceName;
+  this.filter = filter;
+}
+
+public String getDataSourceName() {
+  return dataSourceName;
+}
+
+public DruidFilter getFilter() { return filter; }
+
+@Override
+public String toString() {
+  return "DruidSubScanSpec [dataSourceName=" + dataSourceName  + 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426311857



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanBatchCreator.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidScanBatchCreator implements BatchCreator {
+
+  static final Logger logger = 
LoggerFactory.getLogger(DruidScanBatchCreator.class);

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426311836



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpec.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+
+public class DruidScanSpec {
+
+  private final String dataSourceName;
+  private DruidFilter filter;
+
+  @JsonCreator
+  public DruidScanSpec(@JsonProperty("dataSourceName") String dataSourceName) {
+this.dataSourceName = dataSourceName;
+  }
+
+  public DruidScanSpec(String dataSourceName, DruidFilter filter) {
+this.dataSourceName = dataSourceName;
+this.filter = filter;
+  }
+
+  public String getDataSourceName() {
+return this.dataSourceName;
+  }
+
+  public DruidFilter getFilter() {
+return this.filter;
+  }
+
+  @Override
+  public String toString() {
+String filter = this.filter == null ? "" : this.filter.toJson();

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426308172



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanSpecBuilder.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.druid.common.DruidBoundFilter;
+import org.apache.drill.exec.store.druid.common.DruidIntervalFilter;
+import org.apache.drill.exec.store.druid.common.DruidNotFilter;
+import org.apache.drill.exec.store.druid.common.DruidRegexFilter;
+import org.apache.drill.exec.store.druid.common.DruidSearchFilter;
+import org.apache.drill.exec.store.druid.common.DruidSelectorFilter;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+class DruidScanSpecBuilder {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidScanSpecBuilder.class);
+
+  private static final String REGEX_KEYWORD_HINT = "$regex$_";
+
+  DruidScanSpec build(String dataSourceName,
+  String functionName,
+  SchemaPath field,
+  Object fieldValue) throws IOException {
+// extract the field name
+
+String fieldName = field.getAsNamePart().getName(); 
//.getAsUnescapedPath();
+String filter;
+
+logger.debug("createDruidScanSpec called. FunctionName - "
+  + functionName + ", field - " + fieldName + ", fieldValue - " + 
fieldValue);
+
+switch (functionName) {
+  case "equal":

Review comment:
   This was fixed to use `FunctionNames`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307659



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+@JsonProperty("brokerAddress") String brokerAddress,
+@JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+this.brokerAddress = brokerAddress;
+this.coordinatorAddress = coordinatorAddress;
+logger.info("Broker Address - {}, Coordinator Address - {}", 
brokerAddress, coordinatorAddress);
+//TODO Make this configurable.
+  }
+
+  @Override
+  public boolean equals(Object that) {
+if (this == that) {
+  return true;
+} else if (that == null || getClass() != that.getClass()) {
+  return false;
+}
+DruidStoragePluginConfig thatConfig = (DruidStoragePluginConfig) that;
+return
+  (this.brokerAddress.equals(thatConfig.brokerAddress)
+&& this.coordinatorAddress.equals(thatConfig.coordinatorAddress));
+  }
+
+  @Override
+  public int hashCode() {

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307614



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+@JsonProperty("brokerAddress") String brokerAddress,
+@JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+this.brokerAddress = brokerAddress;
+this.coordinatorAddress = coordinatorAddress;
+logger.info("Broker Address - {}, Coordinator Address - {}", 
brokerAddress, coordinatorAddress);
+//TODO Make this configurable.

Review comment:
   This was removed earlier.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307531



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidSubScan.java
##
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import static java.util.Collections.emptyIterator;
+
+/**
+ * A Class containing information to read a single druid data source.
+ */
+@JsonTypeName("druid-datasource-scan")
+public class DruidSubScan extends AbstractBase implements SubScan {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidSubScan.class);
+
+  @JsonProperty
+  public final DruidStoragePluginConfig druidStoragePluginConfig;
+
+  @JsonIgnore
+  private final DruidStoragePlugin druidStoragePlugin;
+
+  private final List dataSourceScanSpecList;
+  private final List columns;
+
+  @JsonCreator
+  public DruidSubScan(@JacksonInject StoragePluginRegistry registry,
+  @JsonProperty("userName") String userName,
+  @JsonProperty("druidStoragePluginConfig") 
StoragePluginConfig druidStoragePluginConfig,
+  @JsonProperty("datasourceScanSpecList") 
LinkedList datasourceScanSpecList,
+  @JsonProperty("columns") List columns) 
throws ExecutionSetupException {
+super(userName);
+druidStoragePlugin = (DruidStoragePlugin) 
registry.getPlugin(druidStoragePluginConfig);
+this.dataSourceScanSpecList = datasourceScanSpecList;
+this.druidStoragePluginConfig = (DruidStoragePluginConfig) 
druidStoragePluginConfig;
+this.columns = columns;
+  }
+
+  public DruidSubScan(String userName, DruidStoragePlugin plugin, 
DruidStoragePluginConfig config,
+  List dataSourceInfoList, 
List columns) {
+super(userName);
+druidStoragePlugin = plugin;
+druidStoragePluginConfig = config;
+this.dataSourceScanSpecList = dataSourceInfoList;
+this.columns = columns;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  public List getDataSourceScanSpecList() {
+return dataSourceScanSpecList;
+  }
+
+  @JsonIgnore
+  public DruidStoragePluginConfig getStorageConfig() {
+return druidStoragePluginConfig;
+  }
+
+  public List getColumns() {
+return columns;
+  }
+
+  @Override
+  public boolean isExecutable() {
+return false;
+  }
+
+  @JsonIgnore
+  public DruidStoragePlugin getStorageEngine(){
+return druidStoragePlugin;
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List children) 
throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+return new DruidSubScan(getUserName(), druidStoragePlugin, 
druidStoragePluginConfig, dataSourceScanSpecList, columns);
+  }
+
+  @Override
+  public int getOperatorType() {
+return CoreOperatorType.DRUID_SUB_SCAN_VALUE;
+  }
+
+  @Override
+  public Iterator iterator() {
+return emptyIterator();
+  }
+
+  public static class DruidSubScanSpec {
+
+protected String dataSourceName;
+protected String filter;

Review 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307574



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+  public static final String NAME = "druid";
+
+  @JsonProperty
+  private final String brokerAddress;
+
+  @JsonProperty
+  private final String coordinatorAddress;
+
+  @JsonCreator
+  public DruidStoragePluginConfig(
+@JsonProperty("brokerAddress") String brokerAddress,
+@JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+this.brokerAddress = brokerAddress;
+this.coordinatorAddress = coordinatorAddress;
+logger.info("Broker Address - {}, Coordinator Address - {}", 
brokerAddress, coordinatorAddress);

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307330



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);
+
+public static final String NAME = "druid";
+
+@JsonProperty
+private final String brokerAddress;
+
+@JsonProperty
+private final String coordinatorAddress;
+
+@JsonCreator
+public DruidStoragePluginConfig(
+@JsonProperty("brokerAddress") String brokerAddress,
+@JsonProperty("coordinatorAddress") String coordinatorAddress) {
+
+this.brokerAddress = brokerAddress;
+this.coordinatorAddress = coordinatorAddress;
+logger.info("Broker Address - {}, Coordinator Address - {}", 
brokerAddress, coordinatorAddress);
+//TODO Make this configurable.
+}
+
+@Override
+public boolean equals(Object that) {
+if (this == that) {
+return true;
+} else if (that == null || getClass() != that.getClass()) {
+return false;
+}
+DruidStoragePluginConfig thatConfig = (DruidStoragePluginConfig) that;
+return
+(this.brokerAddress.equals(thatConfig.brokerAddress)
+&& 
this.coordinatorAddress.equals(thatConfig.coordinatorAddress));
+}
+
+@Override
+public int hashCode() {
+int brokerAddressHashCode = this.brokerAddress != null ? 
this.brokerAddress.hashCode() : 0;
+int coordinatorAddressHashCode = this.coordinatorAddress != null ? 
this.coordinatorAddress.hashCode() : 0;
+return brokerAddressHashCode ^ coordinatorAddressHashCode;

Review comment:
   This was fixed earlier





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307438



##
File path: contrib/storage-druid/README.md
##
@@ -0,0 +1,24 @@
+# Drill Apache Druid Plugin
+
+Drill druid storage plugin allows you to perform SQL queries against Druid 
datasource(s).
+
+### Tested with Druid version
+[0.16.0-incubating](https://github.com/apache/incubator-druid/releases/tag/druid-0.16.0-incubating)
+
+### Supported Druid Native Query Types
+
+1. [Select](https://druid.apache.org/docs/latest/querying/select-query.html)

Review comment:
   Made some updates to the README. Let me know if it explains things 
better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307184



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePluginConfig.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.StoragePluginConfigBase;
+
+@JsonTypeName(DruidStoragePluginConfig.NAME)
+public class DruidStoragePluginConfig extends StoragePluginConfigBase {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidStoragePluginConfig.class);

Review comment:
   fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307152



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidStoragePlugin.java
##
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.schema.DruidSchemaFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class DruidStoragePlugin extends AbstractStoragePlugin {
+
+static final Logger logger = 
LoggerFactory.getLogger(DruidStoragePlugin.class);
+
+private final DrillbitContext context;
+private final DruidStoragePluginConfig pluginConfig;
+private final DruidAdminClient druidAdminClient;
+private final DruidQueryClient druidQueryClient;
+private final DruidSchemaFactory schemaFactory;
+
+public DruidStoragePlugin(DruidStoragePluginConfig pluginConfig, 
DrillbitContext context, String name) {
+super(context, name);
+this.pluginConfig = pluginConfig;
+this.context = context;
+this.druidAdminClient = new 
DruidAdminClient(pluginConfig.GetCoordinatorURI());
+this.druidQueryClient = new 
DruidQueryClient(pluginConfig.GetBrokerURI());
+this.schemaFactory = new DruidSchemaFactory(this, name);
+}
+
+@Override
+public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection) throws IOException {
+DruidScanSpec scanSpec = selection.getListWith(new ObjectMapper(), new 
TypeReference() {});
+return new DruidGroupScan(this, scanSpec, null);
+}
+
+@Override
+public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) 
throws IOException {
+schemaFactory.registerSchemas(schemaConfig, parent);
+}
+
+/*@Override
+public Set 
getPhysicalOptimizerRules(OptimizerRulesContext optimizerRulesContext) {
+return ImmutableSet.of(DruidPushDownFilterForScan.INSTANCE);
+}*/

Review comment:
   This was fixed earlier





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307085



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/rest/DruidQueryClient.java
##
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.rest;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+public class DruidQueryClient {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidQueryClient.class);
+
+  private static final String QUERY_BASE_URI = "/druid/v2";
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  private RestClient restClient;

Review comment:
   fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426307040



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/schema/DruidSchemaFactory.java
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.schema;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.AbstractSchemaFactory;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.druid.DruidScanSpec;
+import org.apache.drill.exec.store.druid.DruidStoragePlugin;
+import org.apache.drill.exec.store.druid.DruidStoragePluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+public class DruidSchemaFactory extends AbstractSchemaFactory {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidSchemaFactory.class);
+  final DruidStoragePlugin plugin;
+
+  public DruidSchemaFactory(DruidStoragePlugin plugin, String schemaName) {
+super(schemaName);
+this.plugin = plugin;
+  }
+
+  @Override
+  public void registerSchemas(SchemaConfig schemaConfig, SchemaPlus parent) {
+DruidDataSources schema = new DruidDataSources(getName());
+SchemaPlus hPlus = parent.add(getName(), schema);
+schema.setHolder(hPlus);
+  }
+
+  public class DruidDataSources extends AbstractSchema {
+
+private final Set tableNames;
+private final Map drillTables = Maps.newHashMap();
+
+public DruidDataSources(String name) {
+  super(ImmutableList.of(), name);

Review comment:
   This was fixed earlier.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426306942



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidPushDownFilterForScan.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.exec.planner.logical.DrillOptiq;
+import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.apache.drill.exec.planner.physical.FilterPrel;
+import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.physical.ScanPrel;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DruidPushDownFilterForScan extends StoragePluginOptimizerRule {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidPushDownFilterForScan.class);
+
+  public static final StoragePluginOptimizerRule INSTANCE = new 
DruidPushDownFilterForScan();
+
+  private DruidPushDownFilterForScan() {
+super(
+  RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
+  "DruidPushDownFilterForScan");
+  }
+
+  @Override
+  public void onMatch(RelOptRuleCall relOptRuleCall) {
+final ScanPrel scan = (ScanPrel) relOptRuleCall.rel(1);
+final FilterPrel filter = (FilterPrel) relOptRuleCall.rel(0);
+final RexNode condition = filter.getCondition();
+
+DruidGroupScan groupScan = (DruidGroupScan) scan.getGroupScan();
+if (groupScan.isFilterPushedDown()) {
+  return;
+}
+
+LogicalExpression conditionExp =
+  DrillOptiq.toDrill(
+new 
DrillParseContext(PrelUtil.getPlannerSettings(relOptRuleCall.getPlanner())),
+scan,
+condition);
+
+DruidFilterBuilder druidFilterBuilder =
+  new DruidFilterBuilder(groupScan, conditionExp);
+
+DruidScanSpec newScanSpec = null;
+try {
+  newScanSpec = druidFilterBuilder.parseTree();
+} catch (JsonProcessingException e) {
+  logger.error("Error in onMatch. Exception - " + e.getMessage());
+}
+if (newScanSpec == null) {
+  return; // no filter pushdown so nothing to apply.
+}
+
+DruidGroupScan newGroupsScan =
+new DruidGroupScan(
+groupScan.getUserName(),
+groupScan.getStoragePlugin(),
+newScanSpec,
+groupScan.getColumns());
+newGroupsScan.setFilterPushedDown(true);
+
+final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(),
+  newGroupsScan, scan.getRowType());
+if (druidFilterBuilder.isAllExpressionsConverted()) {
+  /*
+   * Since we could convert the entire filter condition expression into an
+   * Druid filter, we can eliminate the filter operator altogether.
+   */
+  relOptRuleCall.transformTo(newScanPrel);
+} else {
+  relOptRuleCall.transformTo(filter.copy(filter.getTraitSet(),
+ImmutableList.of((RelNode) newScanPrel)));
+}
+  }
+
+  @Override
+  public boolean matches(RelOptRuleCall call) {
+final ScanPrel scan = (ScanPrel) call.rel(1);

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426306954



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidFilterBuilder.java
##
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.drill.common.FunctionNames;
+import org.apache.drill.common.expression.BooleanOperator;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.exec.store.druid.common.DruidFilter;
+import org.apache.drill.exec.store.druid.common.DruidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class DruidFilterBuilder extends
+  AbstractExprVisitor {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(DruidFilterBuilder.class);
+
+  private final DruidGroupScan groupScan;
+  private final LogicalExpression le;
+  private final DruidScanSpecBuilder druidScanSpecBuilder;
+  private boolean allExpressionsConverted = true;
+
+  public DruidFilterBuilder(DruidGroupScan groupScan,
+LogicalExpression conditionExp) {
+this.groupScan = groupScan;
+this.le = conditionExp;
+this.druidScanSpecBuilder = new DruidScanSpecBuilder();
+  }
+
+  public DruidScanSpec parseTree() throws JsonProcessingException {

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426306561



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidGroupScan.class);

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426306592



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidQueryClient.java
##
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.http.protocol.HTTP.CONTENT_TYPE;
+import static javax.ws.rs.core.MediaType.APPLICATION_JSON;
+
+public class DruidQueryClient {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidQueryClient.class);

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-17 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r426306620



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidRecordReader.java
##
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.druid.druid.DruidSelectResponse;
+import org.apache.drill.exec.store.druid.druid.PagingIdentifier;
+import org.apache.drill.exec.store.druid.druid.PagingSpec;
+import org.apache.drill.exec.store.druid.druid.SelectQuery;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class DruidRecordReader extends AbstractRecordReader {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidRecordReader.class);

Review comment:
   Fixed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-03 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r419190071



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/common/DruidBoundFilter.java
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid.common;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+
+public class DruidBoundFilter {
+
+  private static final ObjectMapper objectMapper = new ObjectMapper();
+  private String type = DruidCompareOp.TYPE_BOUND.getCompareOp();
+  private String dimension;
+  private String lower;
+  private String upper;
+  private Boolean lowerStrict =  false;
+  private Boolean upperStrict = false;
+
+  @JsonCreator
+  public DruidBoundFilter(@JsonProperty("dimension") String dimension,
+  @JsonProperty("lower") String lower,
+  @JsonProperty("upper") String upper) {
+this.dimension = dimension;
+this.lower = lower;
+this.upper= upper;
+  }
+
+  @JsonCreator
+  public DruidBoundFilter(@JsonProperty("dimension") String dimension,
+  @JsonProperty("lower") String lower,
+  @JsonProperty("upper") String upper,
+  @JsonProperty("lowerStrict") Boolean lowerStrict,
+  @JsonProperty("upperStrict") Boolean upperStrict) {
+this.dimension = dimension;
+this.lower = lower;
+this.upper= upper;
+this.lowerStrict = lowerStrict;
+this.upperStrict = upperStrict;
+  }
+
+  public String getType() {
+return type;
+  }
+
+  public void setType(String type) {
+this.type = type;
+  }
+
+  public String getDimension() {
+return dimension;
+  }
+
+  public void setDimension(String dimension) {
+this.dimension = dimension;
+  }
+
+  public String getLower() {
+return lower;
+  }
+
+  public void setLower(String lower) {
+this.lower = lower;
+  }
+
+  public String getUpper() {
+return upper;
+  }
+
+  public void setUpper(String upper) {
+this.upper = upper;
+  }
+
+  public Boolean getLowerStrict() {
+return lowerStrict;
+  }
+
+  public void setLowerStrict(Boolean lowerStrict) {
+this.lowerStrict = lowerStrict;
+  }
+
+  public Boolean getUpperStrict() {
+return upperStrict;
+  }
+
+  public void setUpperStrict(Boolean upperStrict) {
+this.upperStrict = upperStrict;
+  }
+
+  public String toJson() throws JsonProcessingException {

Review comment:
   Fixed, I think i was eating bourbon burgers at times.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-02 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r419029352



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanner.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+public class DruidScanner {

Review comment:
   For the time being this file has been removed, I will provide another PR 
to implement scan queries.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-05-02 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r419025233



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidScanner.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+
+import org.apache.drill.exec.store.druid.rest.DruidQueryClient;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+public class DruidScanner {
+
+  final DruidQueryClient client;
+  final String dataSource;
+  int batchSizeBytes;
+  long limit;
+  List projectedColumnNames;
+  long scanRequestTimeout;
+
+  public DruidScanner(DruidQueryClient client, String dataSource) {
+
+this.client = client;
+this.dataSource = dataSource;
+this.batchSizeBytes = 1048576;
+this.limit = 9223372036854775807L;
+this.projectedColumnNames = null;
+  }
+
+  public DruidScanner setProjectedColumnNames(List columnNames) {
+if(columnNames != null) {
+  this.projectedColumnNames = ImmutableList.copyOf(columnNames);
+} else {
+  this.projectedColumnNames = null;
+}
+
+return this;
+  }
+
+  public DruidScanner batchSizeBytes(int batchSizeBytes) {
+this.batchSizeBytes = batchSizeBytes;
+return this;
+  }
+
+  public DruidScanner limit(long limit) {
+this.limit = limit;
+return this;
+  }
+
+  public void GetDruidScannerIterator() {
+
+  }

Review comment:
   I was going to implement [Druid Scan 
Query](https://druid.apache.org/docs/latest/querying/scan-query.html), but may 
be i will do it in another PR, deleting this file for the time being.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-04-26 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r415393574



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidGroupScan.java
##
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+@JsonTypeName("druid-scan")
+public class DruidGroupScan extends AbstractGroupScan {
+
+private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DruidGroupScan.class);
+private static final long DEFAULT_TABLET_SIZE = 1000;
+
+private DruidStoragePluginConfig storagePluginConfig;
+private List columns;
+private DruidScanSpec scanSpec;
+private DruidStoragePlugin storagePlugin;
+private boolean filterPushedDown = false;
+private List druidWorkList = new ArrayList<>();
+private ListMultimap assignments;
+private List affinities;
+private String objectName;
+
+@JsonCreator
+public DruidGroupScan(@JsonProperty("scanSpec") DruidScanSpec scanSpec,
+ @JsonProperty("storagePluginConfig") 
DruidStoragePluginConfig storagePluginConfig,
+ @JsonProperty("columns") List columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry)
+throws IOException, ExecutionSetupException {
+this((DruidStoragePlugin) 
pluginRegistry.getPlugin(storagePluginConfig), scanSpec, columns);
+int columnSize = (columns == null) ? 0 : columns.size();
+}
+
+public DruidGroupScan(DruidStoragePlugin storagePlugin, DruidScanSpec 
scanSpec,
+ List columns) {
+super("someuser");
+objectName = UUID.randomUUID().toString();
+this.storagePlugin = storagePlugin;
+this.storagePluginConfig = storagePlugin.getConfig();
+this.scanSpec = scanSpec;
+this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : 
columns;
+init();
+}
+
+/**
+ * Private constructor, used for cloning.
+ * @param that The DruidGroupScan to clone
+ */
+private DruidGroupScan(DruidGroupScan that) {
+super(that);
+this.columns = that.columns;
+this.scanSpec = that.scanSpec;
+this.storagePlugin = that.storagePlugin;
+this.storagePluginConfig = that.storagePluginConfig;
+this.filterPushedDown = that.filterPushedDown;
+this.druidWorkList = that.druidWorkList;
+this.assignments = that.assignments;
+this.objectName = that.objectName;
+}
+
+@Override
+public GroupScan clone(List columns) {
+DruidGroupScan newScan = new DruidGroupScan(this);
+newScan.columns = 

[GitHub] [drill] akkapur commented on a change in pull request #1888: DRILL-5956: Add Storage Plugin for Apache Druid

2020-04-26 Thread GitBox


akkapur commented on a change in pull request #1888:
URL: https://github.com/apache/drill/pull/1888#discussion_r415372488



##
File path: 
contrib/storage-druid/src/main/java/org/apache/drill/exec/store/druid/DruidCompareFunctionProcessor.java
##
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.druid;
+
+import org.apache.drill.common.expression.CastExpression;
+import org.apache.drill.common.expression.ConvertExpression;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.visitors.AbstractExprVisitor;
+import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
+import org.apache.drill.common.expression.ValueExpressions.DateExpression;
+import org.apache.drill.common.expression.ValueExpressions.DoubleExpression;
+import org.apache.drill.common.expression.ValueExpressions.FloatExpression;
+import org.apache.drill.common.expression.ValueExpressions.IntExpression;
+import org.apache.drill.common.expression.ValueExpressions.LongExpression;
+import org.apache.drill.common.expression.ValueExpressions.QuotedString;
+import org.apache.drill.common.expression.ValueExpressions.TimeExpression;
+
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+
+public class DruidCompareFunctionProcessor extends
+AbstractExprVisitor {
+
+private Object value;
+private boolean success;
+private boolean isEqualityFn;
+private SchemaPath path;
+private String functionName;
+
+public static boolean isCompareFunction(String functionName) {
+return COMPARE_FUNCTIONS_TRANSPOSE_MAP.keySet().contains(functionName);
+}
+
+public static DruidCompareFunctionProcessor process(FunctionCall call) {
+String functionName = call.getName();
+LogicalExpression nameArg = call.args.get(0);
+LogicalExpression valueArg = call.args.size() == 2 ? call.args.get(1)
+: null;
+DruidCompareFunctionProcessor evaluator = new 
DruidCompareFunctionProcessor(
+functionName);
+
+if (valueArg != null) { // binary function
+if (VALUE_EXPRESSION_CLASSES.contains(nameArg.getClass())) {
+LogicalExpression swapArg = valueArg;
+valueArg = nameArg;
+nameArg = swapArg;
+evaluator.functionName = COMPARE_FUNCTIONS_TRANSPOSE_MAP
+.get(functionName);
+}
+evaluator.success = nameArg.accept(evaluator, valueArg);
+} else if (call.args.get(0) instanceof SchemaPath) {
+evaluator.success = true;
+evaluator.path = (SchemaPath) nameArg;
+}
+
+return evaluator;
+}
+
+public DruidCompareFunctionProcessor(String functionName) {
+this.success = false;
+this.functionName = functionName;
+this.isEqualityFn = COMPARE_FUNCTIONS_TRANSPOSE_MAP
+.containsKey(functionName)
+&& COMPARE_FUNCTIONS_TRANSPOSE_MAP.get(functionName).equals(
+functionName);
+}
+
+public Object getValue() {
+return value;
+}
+
+public boolean isSuccess() {
+return success;
+}
+
+public SchemaPath getPath() {
+return path;
+}
+
+public String getFunctionName() {
+return functionName;
+}
+
+@Override
+public Boolean visitCastExpression(CastExpression e,
+   LogicalExpression valueArg) throws 
RuntimeException {
+if (e.getInput() instanceof CastExpression
+|| e.getInput() instanceof SchemaPath) {
+return e.getInput().accept(this, valueArg);
+}
+return false;
+}
+
+@Override
+public Boolean visitConvertExpression(ConvertExpression e,
+  LogicalExpression valueArg) throws 
RuntimeException {
+if (e.getConvertFunction() ==