[jira] [Commented] (DRILL-5941) Skip header / footer logic works incorrectly for Hive tables when file has several input splits

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258863#comment-16258863
 ] 

ASF GitHub Bot commented on DRILL-5941:
---

Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/1030
  
@arina-ielchiieva I am concerned about performance impact by grouping all 
splits in a single reader (essentially, not parallelizing at all).
Wondering if it is possible to do this way:
During planning, in HiveScan,  if it is text file and has header/footer, 
get the number of rows to skip. Read the header/footer rows and based on that, 
adjust the first/last split and offset within them. The splits which have only 
header/footer rows can be removed from inputSplits. In HiveSubScan, change 
hiveReadEntry to be a list (one entry for each split). Add an entry in 
hiveReadEntry, numRowsToSkip (or offsetToStart) which can be passed to the 
recordReaders in getBatch for each subScan. This is fairly complicated and I am 
sure I might be missing some details :-)


> Skip header / footer logic works incorrectly for Hive tables when file has 
> several input splits
> ---
>
> Key: DRILL-5941
> URL: https://issues.apache.org/jira/browse/DRILL-5941
> Project: Apache Drill
>  Issue Type: Bug
>  Components: Storage - Hive
>Affects Versions: 1.11.0
>Reporter: Arina Ielchiieva
>Assignee: Arina Ielchiieva
> Fix For: Future
>
>
> *To reproduce*
> 1. Create csv file with two columns (key, value) for 329 rows, where 
> first row is a header.
> The data file has size of should be greater than chunk size of 256 MB. Copy 
> file to the distributed file system.
> 2. Create table in Hive:
> {noformat}
> CREATE EXTERNAL TABLE `h_table`(
>   `key` bigint,
>   `value` string)
> ROW FORMAT DELIMITED
>   FIELDS TERMINATED BY ','
> STORED AS INPUTFORMAT
>   'org.apache.hadoop.mapred.TextInputFormat'
> OUTPUTFORMAT
>   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
> LOCATION
>   'maprfs:/tmp/h_table'
> TBLPROPERTIES (
>  'skip.header.line.count'='1');
> {noformat}
> 3. Execute query {{select * from hive.h_table}} in Drill (query data using 
> Hive plugin). The result will return less rows then expected. Expected result 
> is 328 (total count minus one row as header).
> *The root cause*
> Since file is greater than default chunk size, it's split into several 
> fragments, known as input splits. For example:
> {noformat}
> maprfs:/tmp/h_table/h_table.csv:0+268435456
> maprfs:/tmp/h_table/h_table.csv:268435457+492782112
> {noformat}
> TextHiveReader is responsible for handling skip header and / or footer logic.
> Currently Drill creates reader [for each input 
> split|https://github.com/apache/drill/blob/master/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java#L84]
>  and skip header and /or footer logic is applied for each input splits, 
> though ideally the above mentioned input splits should have been read by one 
> reader, so skip / header footer logic was applied correctly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (DRILL-5975) Resource utilization

2017-11-19 Thread weijie.tong (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258824#comment-16258824
 ] 

weijie.tong edited comment on DRILL-5975 at 11/20/17 5:58 AM:
--

Thanks for your detail explanation of current model and the reason caused cpu 
overloaded. That matches what I thought. Drill's assumption about resource is 
not real at actual scenario as you pointed out. At Alibaba, we have other teams 
using Flink and Spark, both of the two systems can reach a cpu utilization to 
70%. To Drill, the best score we have achieved is 40%. So far as I know, this 
is also the best result of Mapr .  So Drill really have some works to do about 
the schedule. Here I have some different opinions about some points:

* What's the entity to schedule ?  The entity which consumes the memory and cpu 
is the target. To Drill ,it's the MinorFragment. When the scheduler schedules 
the entity ,it can determine which nodes have enough resource to match the 
requirement to assign work to them. This can also answer the question do we 
need the query-level schedule. Query-level schedule is too coarse to take this 
role. I have investigated what Impala does. I am not optimistic to see better 
improvements. MinorFragment level schedule will let the system works as what 
cpu pipeline acts.

* The RecordBatch will be first inserted into memory , if the memory is not 
enough , it will be flushed into the disk. This the common solution to Flink 
and Spark. The allocated next 'stage' workers will consume the RecordBatch 
soon, maybe just from the memory destination. What's more , the data will also 
work as stream model. The RecordBatchManager will take the Sender's role to 
contact with the receiver and send out the RecordBatch consecutively. By 
leveraging the DataTunnel's sendRecordBatch method, the throttling 
characteristic is also retained. To be clear, this only happens at the exchange 
stage. I don't agree we are reinventing Hive, this model is also what Flink and 
Spark follows.

   The current zk-queue based query level throttling is still needed to  
protect the system from overloaded queries.Some of what you summary will help 
us to do a good scheduler to estimate a more accurate resource requirement to 
allocate.




was (Author: weijie):
Thanks for your detail explanation of current model and the reason caused cpu 
overloaded. That matches what I thought. Drill's assumption about resource is 
not real at actual scenario as you pointed out. At Alibaba, we have other teams 
using Flink and Spark, both of the two systems can reach a cpu utilization to 
70%. To Drill, the best score we have achieved is 40%. So far as I know, this 
is also the best result of Mapr .  So Drill really have some works to do about 
the schedule. Here I have some different opinions about some points:

* What's the entity to schedule ?  The entity which consumes the memory and cpu 
is the target. To Drill ,it's the MinorFragment. When the scheduler schedules 
the entity ,it can determine which nodes have enough resource to match the 
requirement to assign work to them. This can 
also answer the question do we need the query-level schedule. Query-level 
schedule is too coarse to take this role. I have investigated what Impala does. 
I am not optimistic to see better improvements. MinorFragment level schedule 
will let the system works as what cpu pipeline acts.

* The RecordBatch will be first inserted into memory , if the memory is not 
enough , it will be flushed into the disk. This the common solution to Flink 
and Spark. The allocated next 'stage' workers will consume the RecordBatch 
soon, maybe just from the memory destination. What's more , the data will also 
work as stream model. The RecordBatchManager will take the Sender's role to 
contact with the receiver and send out the RecordBatch consecutively. By 
leveraging the DataTunnel's sendRecordBatch method, the throttling 
characteristic is also retained. To be clear, this only happens at the exchange 
stage. I don't agree we are reinventing Hive, this model is also what Flink and 
Spark follows.

   The current zk-queue based query level throttling is still needed to  
protect the system from overloaded queries.Some of what you summary will help 
us to do a good scheduler to estimate a more accurate resource requirement to 
allocate.



> Resource utilization
> 
>
> Key: DRILL-5975
> URL: https://issues.apache.org/jira/browse/DRILL-5975
> Project: Apache Drill
>  Issue Type: New Feature
>Affects Versions: 2.0.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most 
> of the cluster resource is wasted. We can not afford too much concurrent 
> queries. Once the system accepted more queries with 

[jira] [Commented] (DRILL-5975) Resource utilization

2017-11-19 Thread weijie.tong (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258824#comment-16258824
 ] 

weijie.tong commented on DRILL-5975:


Thanks for your detail explanation of current model and the reason caused cpu 
overloaded. That matches what I thought. Drill's assumption about resource is 
not real at actual scenario as you pointed out. At Alibaba, we have other teams 
using Flink and Spark, both of the two systems can reach a cpu utilization to 
70%. To Drill, the best score we have achieved is 40%. So far as I know, this 
is also the best result of Mapr .  So Drill really have some works to do about 
the schedule. Here I have some different opinions about some points:

* What's the entity to schedule ?  The entity which consumes the memory and cpu 
is the target. To Drill ,it's the MinorFragment. When the scheduler schedules 
the entity ,it can determine which nodes have enough resource to match the 
requirement to assign work to them. This can 
also answer the question do we need the query-level schedule. Query-level 
schedule is too coarse to take this role. I have investigated what Impala does. 
I am not optimistic to see better improvements. MinorFragment level schedule 
will let the system works as what cpu pipeline acts.

* The RecordBatch will be first inserted into memory , if the memory is not 
enough , it will be flushed into the disk. This the common solution to Flink 
and Spark. The allocated next 'stage' workers will consume the RecordBatch 
soon, maybe just from the memory destination. What's more , the data will also 
work as stream model. The RecordBatchManager will take the Sender's role to 
contact with the receiver and send out the RecordBatch consecutively. By 
leveraging the DataTunnel's sendRecordBatch method, the throttling 
characteristic is also retained. To be clear, this only happens at the exchange 
stage. I don't agree we are reinventing Hive, this model is also what Flink and 
Spark follows.

   The current zk-queue based query level throttling is still needed to  
protect the system from overloaded queries.Some of what you summary will help 
us to do a good scheduler to estimate a more accurate resource requirement to 
allocate.



> Resource utilization
> 
>
> Key: DRILL-5975
> URL: https://issues.apache.org/jira/browse/DRILL-5975
> Project: Apache Drill
>  Issue Type: New Feature
>Affects Versions: 2.0.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most 
> of the cluster resource is wasted. We can not afford too much concurrent 
> queries. Once the system accepted more queries with a not high cpu load, the 
> query which originally is very quick will become slower and slower.
> The reason is Drill does not supply a scheduler . It just assume all the 
> nodes have enough calculation resource. Once a query comes, it will schedule 
> the related fragments to random nodes not caring about the node's load. Some 
> nodes will suffer more cpu context switch to satisfy the coming query. The 
> profound causes to this is that the runtime minor fragments construct a 
> runtime tree whose nodes spread different drillbits. The runtime tree is a 
> memory pipeline that is all the nodes will stay alone the whole lifecycle of 
> a query by sending out data to upper nodes successively, even though some 
> node could run quickly and quit immediately.What's more the runtime tree is 
> constructed before actual running. The schedule target to Drill will become 
> the whole runtime tree nodes.
> h1. Design
> It will be hard to schedule the runtime tree nodes as a whole. So I try to 
> solve this by breaking the runtime cascade nodes. The graph below describes 
> the initial design. 
> !https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png!   
>  [graph 
> link|https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png]
> Every Drillbit instance will have a RecordBatchManager which will accept all 
> the RecordBatchs written by the senders of local different MinorFragments. 
> The RecordBatchManager will hold the RecordBatchs in memory firstly then disk 
> storage . Once the first RecordBatch of a MinorFragment sender of one query 
> occurs , it will notice the FragmentScheduler. The FragmentScheduler is 
> instanced by the Foreman.It holds the whole PlanFragment execution graph.It 
> will allocate a new corresponding FragmentExecutor to run the generated 
> RecordBatch. The allocated FragmentExecutor will then notify the 
> corresponding FragmentManager to indicate that I am ready to receive the 
> data. Then the FragmentManger will send out the RecordBatch one by one to the 
> corresponding FragmentExecutor's receiver like what the current Sender does 
> by throttling 

[jira] [Assigned] (DRILL-5977) predicate pushdown support kafkaMsgOffset

2017-11-19 Thread B Anil Kumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

B Anil Kumar reassigned DRILL-5977:
---

Assignee: Bhallamudi Venkata Siva Kamesh

> predicate pushdown support kafkaMsgOffset
> -
>
> Key: DRILL-5977
> URL: https://issues.apache.org/jira/browse/DRILL-5977
> Project: Apache Drill
>  Issue Type: Improvement
>Reporter: B Anil Kumar
>Assignee: Bhallamudi Venkata Siva Kamesh
>
> As part of Kafka storage plugin review, below is the suggestion from Paul.
> {noformat}
> Does it make sense to provide a way to select a range of messages: a starting 
> point or a count? Perhaps I want to run my query every five minutes, scanning 
> only those messages since the previous scan. Or, I want to limit my take to, 
> say, the next 1000 messages. Could we use a pseudo-column such as 
> "kafkaMsgOffset" for that purpose? Maybe
> SELECT * FROM  WHERE kafkaMsgOffset > 12345
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258741#comment-16258741
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890058
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader interface provides mechanism to handle various Kafka 
Message
+ * Formats like JSON, AVRO or custom message formats.
+ */
+public interface MessageReader  extends Closeable {
+
+  public void init(DrillBuf buf, List columns, 
VectorContainerWriter writer, boolean allTextMode,
+  boolean skipOuterList, boolean readNumbersAsDouble);
--- End diff --

This is taken care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258740#comment-16258740
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890267
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Closer;
+
+public class KafkaStoragePlugin extends AbstractStoragePlugin {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePlugin.class);
+  private final KafkaSchemaFactory kafkaSchemaFactory;
+  private final KafkaStoragePluginConfig config;
+  private final DrillbitContext context;
+  private final Closer closer = Closer.create();
+
+  public KafkaStoragePlugin(KafkaStoragePluginConfig config, 
DrillbitContext context, String name)
+  throws ExecutionSetupException {
+logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
+this.config = config;
+this.context = context;
+this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+  }
+
+  public DrillbitContext getContext() {
+return this.context;
+  }
+
+  @Override
+  public KafkaStoragePluginConfig getConfig() {
+return this.config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+return Boolean.TRUE;
--- End diff --

This is taken care


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258738#comment-16258738
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890096
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
--- End diff --

This is taken care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258736#comment-16258736
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890085
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
--- End diff --

This is taken care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258744#comment-16258744
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890105
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
--- End diff --

This is taken care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258735#comment-16258735
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890052
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

As per your suggestion raised a JIRA for this 
https://issues.apache.org/jira/browse/DRILL-5976


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258745#comment-16258745
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890077
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.kafkaStoragePlugin = plugin;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List 
children) throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkStoragePluginConfig, coulmns,
+partitionSubScanSpecList);
+  }
+
+  @Override
+  public Iterator iterator() {
+return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkStoragePluginConfig() {
+return kafkStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+return kafkaStoragePlugin;
+  }
+
+  public List getCoulmns() {
+return coulmns;
--- End diff --

This is taken care.


> Kafka storage plugin 

[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258734#comment-16258734
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151889218
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

Thanks for this suggestion Paul. Initially we have consider this as Plugin 
config. 

But, in Kafka most of the times users might need to implement their own 
custom MessageReader implementation. 

For example, Kafka messages can be encrypted. In other frameworks like 
Spark streaming or Storm or Camus, user will provide Deserializer/Decoder. 

As you suggested, created a separate JIRA 
https://issues.apache.org/jira/browse/DRILL-5976 for this.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258746#comment-16258746
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890205
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
--- End diff --

This is taken care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258739#comment-16258739
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890115
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(KafkaStoragePluginConfig.NAME)
+public class KafkaStoragePluginConfig extends StoragePluginConfig {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePluginConfig.class);
+  public static final String NAME = "kafka";
+  private Properties kafkaConsumerProps;
+
+  @JsonCreator
+  public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") 
Map kafkaConsumerProps) {
+this.kafkaConsumerProps = new Properties();
+this.kafkaConsumerProps.putAll(kafkaConsumerProps);
+logger.debug("Kafka Consumer Props {}", this.kafkaConsumerProps);
+  }
+
+  public Properties getKafkaConsumerProps() {
+return kafkaConsumerProps;
+  }
+
+  @Override
+  public int hashCode() {
+final int prime = 31;
+int result = 1;
+result = prime * result + ((kafkaConsumerProps == null) ? 0 : 
kafkaConsumerProps.hashCode());
+return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+if (this == obj) {
+  return true;
+}
+if (obj == null) {
+  return false;
+}
+if (getClass() != obj.getClass()) {
+  return false;
+}
+KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj;
+if (kafkaConsumerProps == null) {
+  if (other.kafkaConsumerProps != null) {
+return false;
+  }
+} else if (!kafkaConsumerProps.equals(other.kafkaConsumerProps)) {
+  return false;
+}
--- End diff --

This is take care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258743#comment-16258743
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890087
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
--- End diff --

This is taken care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-4779) Kafka storage plugin support

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258742#comment-16258742
 ] 

ASF GitHub Bot commented on DRILL-4779:
---

Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890101
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
--- End diff --

This is taken care.


> Kafka storage plugin support
> 
>
> Key: DRILL-4779
> URL: https://issues.apache.org/jira/browse/DRILL-4779
> Project: Apache Drill
>  Issue Type: New Feature
>  Components: Storage - Other
>Affects Versions: 1.11.0
>Reporter: B Anil Kumar
>Assignee: B Anil Kumar
>  Labels: doc-impacting
> Fix For: 1.12.0
>
>
> Implement Kafka storage plugin will enable the strong SQL support for Kafka.
> Initially implementation can target for supporting json and avro message types



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (DRILL-5977) predicate pushdown support kafkaMsgOffset

2017-11-19 Thread B Anil Kumar (JIRA)
B Anil Kumar created DRILL-5977:
---

 Summary: predicate pushdown support kafkaMsgOffset
 Key: DRILL-5977
 URL: https://issues.apache.org/jira/browse/DRILL-5977
 Project: Apache Drill
  Issue Type: Improvement
Reporter: B Anil Kumar


As part of Kafka storage plugin review, below is the suggestion from Paul.

{noformat}
Does it make sense to provide a way to select a range of messages: a starting 
point or a count? Perhaps I want to run my query every five minutes, scanning 
only those messages since the previous scan. Or, I want to limit my take to, 
say, the next 1000 messages. Could we use a pseudo-column such as 
"kafkaMsgOffset" for that purpose? Maybe

SELECT * FROM  WHERE kafkaMsgOffset > 12345
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (DRILL-5976) Kafka MessageReader config optimization

2017-11-19 Thread B Anil Kumar (JIRA)
B Anil Kumar created DRILL-5976:
---

 Summary: Kafka MessageReader config optimization
 Key: DRILL-5976
 URL: https://issues.apache.org/jira/browse/DRILL-5976
 Project: Apache Drill
  Issue Type: Improvement
Reporter: B Anil Kumar
Assignee: B Anil Kumar


Kafka storage plugin currently supports JSON message format, but going forward 
it will support Avro etc.  

Also there might be multiple scenarios where user have to implement their own 
MessageReader (Corresponding their custom Deserializer/Decoder). 

So, this JIRA is to brainstorm on whether to go with MessageReader as a *Plugin 
config* OR system/session option.

Paul's suggestion as part review comment is as below.

{noformat}
Suppose this is two or three releases from now and we support other forms of 
Kafka messages. Different topics use different formats.

If the message format is a system/session option, then I need to switch the 
option before each query. Very cumbersome and error prone.

Instead, perhaps this information should be part of the storage plugin config. 
Then, I can define different plugins: one for each message format.

Further, can I have multiple Kafka servers? If so, would I need different 
plugin configs for each?

So, should we be thinking about encoding most properties as plugin config 
properties?

Now, the plugin might have a format property, one of which is json. The JSON 
config properties would be defined in the json format within the overall 
storage plugin config.
{noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-5975) Resource utilization

2017-11-19 Thread Paul Rogers (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258585#comment-16258585
 ] 

Paul Rogers commented on DRILL-5975:


The design proposed may work for simple queries, but it is not sufficiently 
general for large queries. Let's start with some background. Here is Drill's 
current design. As you point out, there are many opportunities for improvement. 
But, this is what we have today.

This is an important topic, so I've taken the liberty of explaining our current 
understanding in some detail.

h4. Current Model

Drill's current resource model is based on symmetry: it assumes queries arrive 
randomly at each Drillbit. That is, that each user connects to a random 
Foreman, that each user has similar load, and that the number of users is 
larger than the number of Drillbits, so that each Foreman sees roughly equal 
load.

Next, Drill assumes that users want maximum performance of each query. 
Therefore, it strives for maximum parallelization across both the CPUs in each 
node and all nodes in the cluster.

To do this, Drill assumes nodes are symmetrical: that all have the same number 
of available CPUs and memory. Drill also assumes that Drill is the cluster is 
dedicated to Drill and so Drill attempts to saturate all CPUs on all hosts.

Next, Drill assumes that all queries are fast, and so that query 1 will 
complete roughly before query 2 starts. This means that resource sharing is 
sequential: query 1 uses all memory and CPUs, then query 2 does so.

The above model may not be ideal, but it is the "simplest thing that could 
work." It has gotten Drill this far, but it does clearly have limits.

h4. Limitations of the Current Model

Problems occur, obviously, when the real world does not agree with Drill's 
assumptions. Typical problems:

* Drill does not "own" its cluster, and instead shared resources (such as CPU) 
with other workloads.
* Queries are not short, and instead of running sequentially, they end up 
running in parallel.
* Queries need more resources (CPU or memory) than the user has provided.

h4. CPU Overload

While our recent work focused on memory (with spilling and queue-based memory 
allocation), you correctly point out that we need to turn our attention to CPU.

Let's look just a bit deep at the cause of CPU usage. Drill makes two 
assumptions:

* Create as many "major fragments" (work units) as possible for each query. 
(Shown as different colors in the query plan visualization UI.)
* Create as many "minor fragments" (slices) as possible for each major 
fragment. (Shown as numbered items in the query plan UI tables.)
* By default, create a number of minor fragments equal to 70% of the number of 
CPUs. (If your machine has 20 cores, Drill will create 14 slices per major 
fragment.)
* Every minor fragment is implemented as a Java thread.

A bit of math shows why this is a problem. Assume a typical query with, say, 5 
major fragments. Assume 24 CPUs. We will create a number of threads equal to:

24 CPUs * 70% * 5 major fragments = 87 threads

If all these threads were busy at the same time, we'd overload our CPUs by a 
factor of 3, causing intense context switching (which pretty much defeats any 
attempts to optimize for internal CPU caches.) Larger queries can easily 
oversubscribe CPUs by 10 or more times.

Note that each of these threads wants to make use of unlimited memory. It does 
not take too many such queries before Drill thrashes the CPU and runs out of 
memory.

The lesson is that the workload exceeds the available resources, then the 
"assume infinite resources" model no longer works. Some form of throttling is 
needed. Let's discuss that.

Suppose that Drill can make a query faster by using all CPUs. Then, there is, 
by design, no room in the cluster for another query. If we are already using 
300% of CPU, then adding another query simply causes more thrashing, puts more 
pressure on memory, and causes both queries to slow down. In an ideal case, 
both queries will take twice as long as if they ran separately. In extreme 
cases, the slow-down is sub-linear once the OS starts wasting time thrashing 
threads (as shown by the percent system time in, say, the "top" command.)

In this (simplified) model, we are better off running one query to completion, 
then starting the second. Both make full use of the cluster. Total run time is 
the same. Plus, memory pressure is halved.

In general, some queries need all resources, but many do not. In our own 
testing, we see that, with TPC-H queries, there is some advantage to running 
parallel queries up to maybe three or four concurrent queries. After that, we 
just see a linear slow-down. (We've pushed the system to 20, 30 or more queries 
-- it is like your local freeway at rush hour; everything becomes vey slow.)

h4. Throttling

The first challenge is to accept that every cluster has limits. Once Drill 
saturates CPUs, 

[jira] [Commented] (DRILL-5261) Expose REST endpoint in zookeeper

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258553#comment-16258553
 ] 

ASF GitHub Bot commented on DRILL-5261:
---

Github user xhochy commented on the issue:

https://github.com/apache/drill/pull/1042
  
This still needs the protobufs to be re-compiled for C++, I would do this 
once someone signals me that I have taken the correct approach.


> Expose REST endpoint in zookeeper
> -
>
> Key: DRILL-5261
> URL: https://issues.apache.org/jira/browse/DRILL-5261
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Uwe L. Korn
>
> It would be nice to also publish the REST API endpoint of each Drillbit in 
> the Zookeeper. This would mean that we need an additional entry in 
> {{DrillbitEndpoint}}. While I would know how to add the attribute to the 
> ProtoBuf definition and filling the attribute with the correct information, 
> I'm unsure if there is the need for some migration code to support older 
> {{DrillbitEndpoint}} implementations that don't have this attribute.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-5261) Expose REST endpoint in zookeeper

2017-11-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258552#comment-16258552
 ] 

ASF GitHub Bot commented on DRILL-5261:
---

GitHub user xhochy opened a pull request:

https://github.com/apache/drill/pull/1042

DRILL-5261: Expose REST endpoint in zookeeper



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xhochy/drill DRILL-5261

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1042.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1042


commit 2df606590158afe6cb29e9f292eb33e945c26ba0
Author: Uwe L. Korn 
Date:   2017-11-19T09:36:32Z

DRILL-5261: Expose REST endpoint in zookeeper




> Expose REST endpoint in zookeeper
> -
>
> Key: DRILL-5261
> URL: https://issues.apache.org/jira/browse/DRILL-5261
> Project: Apache Drill
>  Issue Type: New Feature
>Reporter: Uwe L. Korn
>
> It would be nice to also publish the REST API endpoint of each Drillbit in 
> the Zookeeper. This would mean that we need an additional entry in 
> {{DrillbitEndpoint}}. While I would know how to add the attribute to the 
> ProtoBuf definition and filling the attribute with the correct information, 
> I'm unsure if there is the need for some migration code to support older 
> {{DrillbitEndpoint}} implementations that don't have this attribute.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (DRILL-5974) Read JSON non-relational fields using text mode

2017-11-19 Thread Aman Sinha (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258550#comment-16258550
 ] 

Aman Sinha commented on DRILL-5974:
---

For the specific example of the 2-D array, Drill currently can query it using 
the array [i] [j] syntax, as shown below (I took this example from the Drill 
docs page [1]):
{noformat}
SELECT features[0].geometry.coordinates[0][1] from 
dfs.`/Users/asinha/data/json/polygon`;
+---+
|EXPR$0 |
+---+
| [-122.42207601332528,37.808835019815085,0.0]  |
+---+
{noformat}

If we treat it as text mode,  my understanding is the above query will no 
longer work.   
Independent of this example, I do agree that there are other types of JSON 
structures that Drill cannot process without setting the all_text_mode, so 
those could benefit.  For instance, a list with heterogenous types,  list with 
NULL values.  

[1] https://drill.apache.org/docs/json-data-model/

> Read JSON non-relational fields using text mode
> ---
>
> Key: DRILL-5974
> URL: https://issues.apache.org/jira/browse/DRILL-5974
> Project: Apache Drill
>  Issue Type: Improvement
>Affects Versions: 1.13.0
>Reporter: Paul Rogers
>Assignee: Paul Rogers
> Fix For: 1.13.0
>
>
> Proposed is a minor enhancement to the JSON reader to better handle 
> non-relational JSON structures.
> As background, Drill handles simple tuples:
> {code}
> {a: 10, b: “fred”}
> {code}
> Drill also handles arrays:
> {code}
> {name: “fred”, hobbies: [“bowling”, “golf”]}
> {code}
> Drill even handles arrays of tuples:
> {code}
> {name: “fred”, orders: [
>   {id: 1001, amount: 12.34},
>   {id: 1002, amount: 56.78}]}
> {code}
> The above are termed "relational" because there is a straightforward mapping 
> to/from tables into the above JSON structures.
> Things get interesting with non-relational types, such as 2-D arrays:
> {code}
> {id: 4, shape: “square”, points: [[0, 0], [0, 5], [5, 0], [5, 5]]}
> {code}
> Drill has two solutions:
> * Turn on the experimental list and union support.
> * Enable all-text mode to read all fields as JSON text.
> Proposed is a middle ground:
> * Read fields with relational types into vectors.
> * Read non-relational fields using text mode.
> Thus, the first three examples would all result in the JSON data parsed into 
> Drill vectors. But, the fourth, non-relational example would produce a row 
> that looks like this:
> {noformat}
> id, shape, points
> 4, “shape”, “[[0, 0], [0, 5], [5, 0], [5, 5]]”
> {noformat}
> Although Drill can’t parse the 2-D array, Drill will pass the array along to 
> the client, which can use its favorite JSON parser to parse the array and do 
> something useful (like draw the square in this case.)
> Specifically, the proposal is to:
> * Apply this change only to the revised “batch size aware” JSON reader.
> * Use the above parsing model by default.
> * Use the experimental list-and-union support if the existing 
> {{exec.enable_union_type}} system/session option is set.
> Existing queries should “just work.” In fact, now JSON with non-relational 
> types will work “out-of-the-box” without all-text mode or the experimental 
> types.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (DRILL-5975) Resource utilization

2017-11-19 Thread weijie.tong (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448
 ] 

weijie.tong edited comment on DRILL-5975 at 11/19/17 12:45 PM:
---

Yes , we already used the queue option for a long time. It's good to prevent 
the cluster from being overloaded, but too coarse as being a scheduler. I have 
noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's 
a good design to the memory allocation. But I think it will give little help to 
prevent a Drillbit from being assigned too much work at current architecture.

We need a scheduler to do the fragment level scheduling work ,call it first 
level schedule. The YARN like model schedule works at the Drillbit node level 
,call it second level schedule. First level schedule can work upon the second 
level schedule, says when we deploy the Drill on the Yarn. 

I propose this design as I have investigated the Flink and Spark projects. I 
prefer Flink's 
[design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks]
 . I will look into Presto as your advice. I also have discussed with [~jni] 
privately.  We have some common opinion at some point. 

I think it's necessary to introduce the RecordBatchManager role to break the 
current RPC cascaded dependence between two MajorFragments at the data exchange 
stage.If the memory is enough or the upper MajorFragment's calculation is fast 
enough, the written RecordBatchs will be pushed into the consumers quickly, no 
chance to go to disk, behaves the same performance as current implementation.   
It will also let MinorFragment level schedule become possible.

Welcome to discuss.



was (Author: weijie):
Yes , we already used the queue option for a long time. It's good to prevent 
the cluster from being overloaded, but too coarse as being a scheduler. I have 
noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's 
a good design to the memory allocation. But I think it will give little help to 
prevent a Drillbit from being assigned too much work at current architecture.

We need a scheduler to do the fragment level scheduling work ,call it first 
level schedule. The YARN like model schedule works at the Drillbit node level 
,call it second level schedule. First level schedule can work upon the second 
level schedule, says when we deploy the Drill on the Yarn. 

I propose this design by having investigated the Flink and Spark projects. I 
prefer Flink's 
[design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks]
 . I will look into Presto as your advice. I also have discussed with [~jni] 
privately.  We have some common opinion at some point. 

I think it's necessary to introduce the RecordBatchManager role to break the 
current RPC cascaded dependence between two MajorFragments at the data exchange 
stage.If the memory is enough or the upper MajorFragment's calculation is fast 
enough, the written RecordBatchs will be pushed into the consumers quickly, no 
chance to go to disk, behaves the same performance as current implementation.   
It will also let MinorFragment level schedule become possible.

Welcome to discuss.


> Resource utilization
> 
>
> Key: DRILL-5975
> URL: https://issues.apache.org/jira/browse/DRILL-5975
> Project: Apache Drill
>  Issue Type: New Feature
>Affects Versions: 2.0.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most 
> of the cluster resource is wasted. We can not afford too much concurrent 
> queries. Once the system accepted more queries with a not high cpu load, the 
> query which originally is very quick will become slower and slower.
> The reason is Drill does not supply a scheduler . It just assume all the 
> nodes have enough calculation resource. Once a query comes, it will schedule 
> the related fragments to random nodes not caring about the node's load. Some 
> nodes will suffer more cpu context switch to satisfy the coming query. The 
> profound causes to this is that the runtime minor fragments construct a 
> runtime tree whose nodes spread different drillbits. The runtime tree is a 
> memory pipeline that is all the nodes will stay alone the whole lifecycle of 
> a query by sending out data to upper nodes successively, even though some 
> node could run quickly and quit immediately.What's more the runtime tree is 
> constructed before actual running. The schedule target to Drill will become 
> the whole runtime tree nodes.
> h1. Design
> It will be hard to schedule the runtime tree nodes as a whole. So I try to 
> solve this by breaking the runtime cascade nodes. The graph below describes 
> the initial design. 
> 

[jira] [Comment Edited] (DRILL-5975) Resource utilization

2017-11-19 Thread weijie.tong (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448
 ] 

weijie.tong edited comment on DRILL-5975 at 11/19/17 11:16 AM:
---

Yes , we already used the queue option for a long time. It's good to prevent 
the cluster from being overloaded, but too coarse as being a scheduler. I have 
noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's 
a good design to the memory allocation. But I think it will give little help to 
prevent a Drillbit from being assigned too much work at current architecture.

We need a scheduler to do the fragment level scheduling work ,call it first 
level schedule. The YARN like model schedule works at the Drillbit node level 
,call it second level schedule. First level schedule can work upon the second 
level schedule, says when we deploy the Drill on the Yarn. 

I propose this design by having investigated the Flink and Spark projects. I 
prefer Flink's 
[design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks]
 . I will look into Presto as your advice. I also have discussed with [~jni] 
privately.  We have some common opinion at some point. 

I think it's necessary to introduce the RecordBatchManager role to break the 
current RPC cascaded dependence between two MajorFragments at the data exchange 
stage.If the memory is enough or the upper MajorFragment's calculation is fast 
enough, the written RecordBatchs will be pushed into the consumers quickly, no 
chance to go to disk, behaves the same performance as current implementation.   
It will also let MinorFragment level schedule become possible.

Welcome to discuss.



was (Author: weijie):
Yes , we already used the queue option for a long time. It's good to prevent 
the cluster from being overloaded, but too coarse as being a scheduler. I have 
noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's 
a good design to the memory allocation. But I think it will give little help to 
prevent a Drillbit from being assigned too much work at current architecture.

We need a scheduler to do the fragment level scheduling work ,call it first 
level schedule. The YARN like model schedule works at the Drillbit node level 
,call it second level schedule. First level schedule can work upon the second 
level schedule, says when we deploy the Drill on the Yarn. 

I propose this design by having investigated the Flink and Spark projects. I 
prefer Flink's design 
[design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks]
 . I will look into Presto as your advice. I also have discussed with [~jni] 
privately.  We have some common opinion at some point. 

I think it's necessary to introduce the RecordBatchManager role to break the 
current RPC cascaded dependence between two MajorFragments at the data exchange 
stage.If the memory is enough or the upper MajorFragment's calculation is fast 
enough, the written RecordBatchs will be pushed into the consumers quickly, no 
chance to go to disk, behaves the same performance as current implementation.   
It will also let MinorFragment level schedule become possible.

Welcome to discuss.


> Resource utilization
> 
>
> Key: DRILL-5975
> URL: https://issues.apache.org/jira/browse/DRILL-5975
> Project: Apache Drill
>  Issue Type: New Feature
>Affects Versions: 2.0.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most 
> of the cluster resource is wasted. We can not afford too much concurrent 
> queries. Once the system accepted more queries with a not high cpu load, the 
> query which originally is very quick will become slower and slower.
> The reason is Drill does not supply a scheduler . It just assume all the 
> nodes have enough calculation resource. Once a query comes, it will schedule 
> the related fragments to random nodes not caring about the node's load. Some 
> nodes will suffer more cpu context switch to satisfy the coming query. The 
> profound causes to this is that the runtime minor fragments construct a 
> runtime tree whose nodes spread different drillbits. The runtime tree is a 
> memory pipeline that is all the nodes will stay alone the whole lifecycle of 
> a query by sending out data to upper nodes successively, even though some 
> node could run quickly and quit immediately.What's more the runtime tree is 
> constructed before actual running. The schedule target to Drill will become 
> the whole runtime tree nodes.
> h1. Design
> It will be hard to schedule the runtime tree nodes as a whole. So I try to 
> solve this by breaking the runtime cascade nodes. The graph below describes 
> the initial design. 
> 

[jira] [Comment Edited] (DRILL-5975) Resource utilization

2017-11-19 Thread weijie.tong (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448
 ] 

weijie.tong edited comment on DRILL-5975 at 11/19/17 11:15 AM:
---

Yes , we already used the queue option for a long time. It's good to prevent 
the cluster from being overloaded, but too coarse as being a scheduler. I have 
noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's 
a good design to the memory allocation. But I think it will give little help to 
prevent a Drillbit from being assigned too much work at current architecture.

We need a scheduler to do the fragment level scheduling work ,call it first 
level schedule. The YARN like model schedule works at the Drillbit node level 
,call it second level schedule. First level schedule can work upon the second 
level schedule, says when we deploy the Drill on the Yarn. 

I propose this design by having investigated the Flink and Spark projects. I 
prefer Flink's design 
[design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks]
 . I will look into Presto as your advice. I also have discussed with [~jni] 
privately.  We have some common opinion at some point. 

I think it's necessary to introduce the RecordBatchManager role to break the 
current RPC cascaded dependence between two MajorFragments at the data exchange 
stage.If the memory is enough or the upper MajorFragment's calculation is fast 
enough, the written RecordBatchs will be pushed into the consumers quickly, no 
chance to go to disk, behaves the same performance as current implementation.   
It will also let MinorFragment level schedule become possible.

Welcome to discuss.



was (Author: weijie):
Yes , we already used the queue option for a long time. It's good to prevent 
the cluster from being overloaded, but too coarse as being a scheduler. I have 
noticed the [#https://issues.apache.org/jira/browse/DRILL-5716]. It's a good 
design to the memory allocation. But I think it will give little help to 
prevent a Drillbit from being assigned too much work at current architecture.

We need a scheduler to do the fragment level scheduling work ,call it first 
level schedule. The YARN like model schedule works at the Drillbit node level 
,call it second level schedule. First level schedule can work upon the second 
level schedule, says when we deploy the Drill on the Yarn. 

I propose this design by having investigated the Flink and Spark projects. I 
prefer Flink's design 
[#https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks]
   . I will look into Presto as your advice. I also have discussed with [~jni] 
privately.  We have some common opinion at some point. 

I think it's necessary to introduce the RecordBatchManager role to break the 
current RPC cascaded dependence between two MajorFragments at the data exchange 
stage.If the memory is enough or the upper MajorFragment's calculation is fast 
enough, the written RecordBatchs will be pushed into the consumers quickly, no 
chance to go to disk, behaves the same performance as current implementation.   
It will also let MinorFragment level schedule become possible.

Welcome to discuss.


> Resource utilization
> 
>
> Key: DRILL-5975
> URL: https://issues.apache.org/jira/browse/DRILL-5975
> Project: Apache Drill
>  Issue Type: New Feature
>Affects Versions: 2.0.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most 
> of the cluster resource is wasted. We can not afford too much concurrent 
> queries. Once the system accepted more queries with a not high cpu load, the 
> query which originally is very quick will become slower and slower.
> The reason is Drill does not supply a scheduler . It just assume all the 
> nodes have enough calculation resource. Once a query comes, it will schedule 
> the related fragments to random nodes not caring about the node's load. Some 
> nodes will suffer more cpu context switch to satisfy the coming query. The 
> profound causes to this is that the runtime minor fragments construct a 
> runtime tree whose nodes spread different drillbits. The runtime tree is a 
> memory pipeline that is all the nodes will stay alone the whole lifecycle of 
> a query by sending out data to upper nodes successively, even though some 
> node could run quickly and quit immediately.What's more the runtime tree is 
> constructed before actual running. The schedule target to Drill will become 
> the whole runtime tree nodes.
> h1. Design
> It will be hard to schedule the runtime tree nodes as a whole. So I try to 
> solve this by breaking the runtime cascade nodes. The graph below describes 
> the initial design. 
> 

[jira] [Commented] (DRILL-5975) Resource utilization

2017-11-19 Thread weijie.tong (JIRA)

[ 
https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448
 ] 

weijie.tong commented on DRILL-5975:


Yes , we already used the queue option for a long time. It's good to prevent 
the cluster from being overloaded, but too coarse as being a scheduler. I have 
noticed the [#https://issues.apache.org/jira/browse/DRILL-5716]. It's a good 
design to the memory allocation. But I think it will give little help to 
prevent a Drillbit from being assigned too much work at current architecture.

We need a scheduler to do the fragment level scheduling work ,call it first 
level schedule. The YARN like model schedule works at the Drillbit node level 
,call it second level schedule. First level schedule can work upon the second 
level schedule, says when we deploy the Drill on the Yarn. 

I propose this design by having investigated the Flink and Spark projects. I 
prefer Flink's design 
[#https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks]
   . I will look into Presto as your advice. I also have discussed with [~jni] 
privately.  We have some common opinion at some point. 

I think it's necessary to introduce the RecordBatchManager role to break the 
current RPC cascaded dependence between two MajorFragments at the data exchange 
stage.If the memory is enough or the upper MajorFragment's calculation is fast 
enough, the written RecordBatchs will be pushed into the consumers quickly, no 
chance to go to disk, behaves the same performance as current implementation.   
It will also let MinorFragment level schedule become possible.

Welcome to discuss.


> Resource utilization
> 
>
> Key: DRILL-5975
> URL: https://issues.apache.org/jira/browse/DRILL-5975
> Project: Apache Drill
>  Issue Type: New Feature
>Affects Versions: 2.0.0
>Reporter: weijie.tong
>Assignee: weijie.tong
>
> h1. Motivation
> Now the resource utilization radio of Drill's cluster is not too good. Most 
> of the cluster resource is wasted. We can not afford too much concurrent 
> queries. Once the system accepted more queries with a not high cpu load, the 
> query which originally is very quick will become slower and slower.
> The reason is Drill does not supply a scheduler . It just assume all the 
> nodes have enough calculation resource. Once a query comes, it will schedule 
> the related fragments to random nodes not caring about the node's load. Some 
> nodes will suffer more cpu context switch to satisfy the coming query. The 
> profound causes to this is that the runtime minor fragments construct a 
> runtime tree whose nodes spread different drillbits. The runtime tree is a 
> memory pipeline that is all the nodes will stay alone the whole lifecycle of 
> a query by sending out data to upper nodes successively, even though some 
> node could run quickly and quit immediately.What's more the runtime tree is 
> constructed before actual running. The schedule target to Drill will become 
> the whole runtime tree nodes.
> h1. Design
> It will be hard to schedule the runtime tree nodes as a whole. So I try to 
> solve this by breaking the runtime cascade nodes. The graph below describes 
> the initial design. 
> !https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png!   
>  [graph 
> link|https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png]
> Every Drillbit instance will have a RecordBatchManager which will accept all 
> the RecordBatchs written by the senders of local different MinorFragments. 
> The RecordBatchManager will hold the RecordBatchs in memory firstly then disk 
> storage . Once the first RecordBatch of a MinorFragment sender of one query 
> occurs , it will notice the FragmentScheduler. The FragmentScheduler is 
> instanced by the Foreman.It holds the whole PlanFragment execution graph.It 
> will allocate a new corresponding FragmentExecutor to run the generated 
> RecordBatch. The allocated FragmentExecutor will then notify the 
> corresponding FragmentManager to indicate that I am ready to receive the 
> data. Then the FragmentManger will send out the RecordBatch one by one to the 
> corresponding FragmentExecutor's receiver like what the current Sender does 
> by throttling the data stream.
> What we can gain from this design is :
> a. The computation leaf node does not to wait for the consumer's speed to end 
> its life to release the resource.
> b. The sending data logic will be isolated from the computation nodes and 
> shared by different FragmentManagers.
> c. We can schedule the MajorFragments according to Drillbit's actual resource 
> capacity at runtime.
> d. Drill's pipeline data processing characteristic is also retained.
> h1. Plan
> This will be a large PR ,so I plan to divide it into some small ones.
> a. to implement the