[jira] [Commented] (DRILL-5941) Skip header / footer logic works incorrectly for Hive tables when file has several input splits
[ https://issues.apache.org/jira/browse/DRILL-5941?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258863#comment-16258863 ] ASF GitHub Bot commented on DRILL-5941: --- Github user ppadma commented on the issue: https://github.com/apache/drill/pull/1030 @arina-ielchiieva I am concerned about performance impact by grouping all splits in a single reader (essentially, not parallelizing at all). Wondering if it is possible to do this way: During planning, in HiveScan, if it is text file and has header/footer, get the number of rows to skip. Read the header/footer rows and based on that, adjust the first/last split and offset within them. The splits which have only header/footer rows can be removed from inputSplits. In HiveSubScan, change hiveReadEntry to be a list (one entry for each split). Add an entry in hiveReadEntry, numRowsToSkip (or offsetToStart) which can be passed to the recordReaders in getBatch for each subScan. This is fairly complicated and I am sure I might be missing some details :-) > Skip header / footer logic works incorrectly for Hive tables when file has > several input splits > --- > > Key: DRILL-5941 > URL: https://issues.apache.org/jira/browse/DRILL-5941 > Project: Apache Drill > Issue Type: Bug > Components: Storage - Hive >Affects Versions: 1.11.0 >Reporter: Arina Ielchiieva >Assignee: Arina Ielchiieva > Fix For: Future > > > *To reproduce* > 1. Create csv file with two columns (key, value) for 329 rows, where > first row is a header. > The data file has size of should be greater than chunk size of 256 MB. Copy > file to the distributed file system. > 2. Create table in Hive: > {noformat} > CREATE EXTERNAL TABLE `h_table`( > `key` bigint, > `value` string) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY ',' > STORED AS INPUTFORMAT > 'org.apache.hadoop.mapred.TextInputFormat' > OUTPUTFORMAT > 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' > LOCATION > 'maprfs:/tmp/h_table' > TBLPROPERTIES ( > 'skip.header.line.count'='1'); > {noformat} > 3. Execute query {{select * from hive.h_table}} in Drill (query data using > Hive plugin). The result will return less rows then expected. Expected result > is 328 (total count minus one row as header). > *The root cause* > Since file is greater than default chunk size, it's split into several > fragments, known as input splits. For example: > {noformat} > maprfs:/tmp/h_table/h_table.csv:0+268435456 > maprfs:/tmp/h_table/h_table.csv:268435457+492782112 > {noformat} > TextHiveReader is responsible for handling skip header and / or footer logic. > Currently Drill creates reader [for each input > split|https://github.com/apache/drill/blob/master/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java#L84] > and skip header and /or footer logic is applied for each input splits, > though ideally the above mentioned input splits should have been read by one > reader, so skip / header footer logic was applied correctly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (DRILL-5975) Resource utilization
[ https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258824#comment-16258824 ] weijie.tong edited comment on DRILL-5975 at 11/20/17 5:58 AM: -- Thanks for your detail explanation of current model and the reason caused cpu overloaded. That matches what I thought. Drill's assumption about resource is not real at actual scenario as you pointed out. At Alibaba, we have other teams using Flink and Spark, both of the two systems can reach a cpu utilization to 70%. To Drill, the best score we have achieved is 40%. So far as I know, this is also the best result of Mapr . So Drill really have some works to do about the schedule. Here I have some different opinions about some points: * What's the entity to schedule ? The entity which consumes the memory and cpu is the target. To Drill ,it's the MinorFragment. When the scheduler schedules the entity ,it can determine which nodes have enough resource to match the requirement to assign work to them. This can also answer the question do we need the query-level schedule. Query-level schedule is too coarse to take this role. I have investigated what Impala does. I am not optimistic to see better improvements. MinorFragment level schedule will let the system works as what cpu pipeline acts. * The RecordBatch will be first inserted into memory , if the memory is not enough , it will be flushed into the disk. This the common solution to Flink and Spark. The allocated next 'stage' workers will consume the RecordBatch soon, maybe just from the memory destination. What's more , the data will also work as stream model. The RecordBatchManager will take the Sender's role to contact with the receiver and send out the RecordBatch consecutively. By leveraging the DataTunnel's sendRecordBatch method, the throttling characteristic is also retained. To be clear, this only happens at the exchange stage. I don't agree we are reinventing Hive, this model is also what Flink and Spark follows. The current zk-queue based query level throttling is still needed to protect the system from overloaded queries.Some of what you summary will help us to do a good scheduler to estimate a more accurate resource requirement to allocate. was (Author: weijie): Thanks for your detail explanation of current model and the reason caused cpu overloaded. That matches what I thought. Drill's assumption about resource is not real at actual scenario as you pointed out. At Alibaba, we have other teams using Flink and Spark, both of the two systems can reach a cpu utilization to 70%. To Drill, the best score we have achieved is 40%. So far as I know, this is also the best result of Mapr . So Drill really have some works to do about the schedule. Here I have some different opinions about some points: * What's the entity to schedule ? The entity which consumes the memory and cpu is the target. To Drill ,it's the MinorFragment. When the scheduler schedules the entity ,it can determine which nodes have enough resource to match the requirement to assign work to them. This can also answer the question do we need the query-level schedule. Query-level schedule is too coarse to take this role. I have investigated what Impala does. I am not optimistic to see better improvements. MinorFragment level schedule will let the system works as what cpu pipeline acts. * The RecordBatch will be first inserted into memory , if the memory is not enough , it will be flushed into the disk. This the common solution to Flink and Spark. The allocated next 'stage' workers will consume the RecordBatch soon, maybe just from the memory destination. What's more , the data will also work as stream model. The RecordBatchManager will take the Sender's role to contact with the receiver and send out the RecordBatch consecutively. By leveraging the DataTunnel's sendRecordBatch method, the throttling characteristic is also retained. To be clear, this only happens at the exchange stage. I don't agree we are reinventing Hive, this model is also what Flink and Spark follows. The current zk-queue based query level throttling is still needed to protect the system from overloaded queries.Some of what you summary will help us to do a good scheduler to estimate a more accurate resource requirement to allocate. > Resource utilization > > > Key: DRILL-5975 > URL: https://issues.apache.org/jira/browse/DRILL-5975 > Project: Apache Drill > Issue Type: New Feature >Affects Versions: 2.0.0 >Reporter: weijie.tong >Assignee: weijie.tong > > h1. Motivation > Now the resource utilization radio of Drill's cluster is not too good. Most > of the cluster resource is wasted. We can not afford too much concurrent > queries. Once the system accepted more queries with
[jira] [Commented] (DRILL-5975) Resource utilization
[ https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258824#comment-16258824 ] weijie.tong commented on DRILL-5975: Thanks for your detail explanation of current model and the reason caused cpu overloaded. That matches what I thought. Drill's assumption about resource is not real at actual scenario as you pointed out. At Alibaba, we have other teams using Flink and Spark, both of the two systems can reach a cpu utilization to 70%. To Drill, the best score we have achieved is 40%. So far as I know, this is also the best result of Mapr . So Drill really have some works to do about the schedule. Here I have some different opinions about some points: * What's the entity to schedule ? The entity which consumes the memory and cpu is the target. To Drill ,it's the MinorFragment. When the scheduler schedules the entity ,it can determine which nodes have enough resource to match the requirement to assign work to them. This can also answer the question do we need the query-level schedule. Query-level schedule is too coarse to take this role. I have investigated what Impala does. I am not optimistic to see better improvements. MinorFragment level schedule will let the system works as what cpu pipeline acts. * The RecordBatch will be first inserted into memory , if the memory is not enough , it will be flushed into the disk. This the common solution to Flink and Spark. The allocated next 'stage' workers will consume the RecordBatch soon, maybe just from the memory destination. What's more , the data will also work as stream model. The RecordBatchManager will take the Sender's role to contact with the receiver and send out the RecordBatch consecutively. By leveraging the DataTunnel's sendRecordBatch method, the throttling characteristic is also retained. To be clear, this only happens at the exchange stage. I don't agree we are reinventing Hive, this model is also what Flink and Spark follows. The current zk-queue based query level throttling is still needed to protect the system from overloaded queries.Some of what you summary will help us to do a good scheduler to estimate a more accurate resource requirement to allocate. > Resource utilization > > > Key: DRILL-5975 > URL: https://issues.apache.org/jira/browse/DRILL-5975 > Project: Apache Drill > Issue Type: New Feature >Affects Versions: 2.0.0 >Reporter: weijie.tong >Assignee: weijie.tong > > h1. Motivation > Now the resource utilization radio of Drill's cluster is not too good. Most > of the cluster resource is wasted. We can not afford too much concurrent > queries. Once the system accepted more queries with a not high cpu load, the > query which originally is very quick will become slower and slower. > The reason is Drill does not supply a scheduler . It just assume all the > nodes have enough calculation resource. Once a query comes, it will schedule > the related fragments to random nodes not caring about the node's load. Some > nodes will suffer more cpu context switch to satisfy the coming query. The > profound causes to this is that the runtime minor fragments construct a > runtime tree whose nodes spread different drillbits. The runtime tree is a > memory pipeline that is all the nodes will stay alone the whole lifecycle of > a query by sending out data to upper nodes successively, even though some > node could run quickly and quit immediately.What's more the runtime tree is > constructed before actual running. The schedule target to Drill will become > the whole runtime tree nodes. > h1. Design > It will be hard to schedule the runtime tree nodes as a whole. So I try to > solve this by breaking the runtime cascade nodes. The graph below describes > the initial design. > !https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png! > [graph > link|https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png] > Every Drillbit instance will have a RecordBatchManager which will accept all > the RecordBatchs written by the senders of local different MinorFragments. > The RecordBatchManager will hold the RecordBatchs in memory firstly then disk > storage . Once the first RecordBatch of a MinorFragment sender of one query > occurs , it will notice the FragmentScheduler. The FragmentScheduler is > instanced by the Foreman.It holds the whole PlanFragment execution graph.It > will allocate a new corresponding FragmentExecutor to run the generated > RecordBatch. The allocated FragmentExecutor will then notify the > corresponding FragmentManager to indicate that I am ready to receive the > data. Then the FragmentManger will send out the RecordBatch one by one to the > corresponding FragmentExecutor's receiver like what the current Sender does > by throttling
[jira] [Assigned] (DRILL-5977) predicate pushdown support kafkaMsgOffset
[ https://issues.apache.org/jira/browse/DRILL-5977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] B Anil Kumar reassigned DRILL-5977: --- Assignee: Bhallamudi Venkata Siva Kamesh > predicate pushdown support kafkaMsgOffset > - > > Key: DRILL-5977 > URL: https://issues.apache.org/jira/browse/DRILL-5977 > Project: Apache Drill > Issue Type: Improvement >Reporter: B Anil Kumar >Assignee: Bhallamudi Venkata Siva Kamesh > > As part of Kafka storage plugin review, below is the suggestion from Paul. > {noformat} > Does it make sense to provide a way to select a range of messages: a starting > point or a count? Perhaps I want to run my query every five minutes, scanning > only those messages since the previous scan. Or, I want to limit my take to, > say, the next 1000 messages. Could we use a pseudo-column such as > "kafkaMsgOffset" for that purpose? Maybe > SELECT * FROM WHERE kafkaMsgOffset > 12345 > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258741#comment-16258741 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890058 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java --- @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.decoders; + +import java.io.Closeable; +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.store.kafka.KafkaStoragePlugin; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; + +import io.netty.buffer.DrillBuf; + +/** + * MessageReader interface provides mechanism to handle various Kafka Message + * Formats like JSON, AVRO or custom message formats. + */ +public interface MessageReader extends Closeable { + + public void init(DrillBuf buf, List columns, VectorContainerWriter writer, boolean allTextMode, + boolean skipOuterList, boolean readNumbersAsDouble); --- End diff -- This is taken care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258740#comment-16258740 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890267 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.io.IOException; +import java.util.Set; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.AbstractStoragePlugin; +import org.apache.drill.exec.store.SchemaConfig; +import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableSet; +import com.google.common.io.Closer; + +public class KafkaStoragePlugin extends AbstractStoragePlugin { + + private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePlugin.class); + private final KafkaSchemaFactory kafkaSchemaFactory; + private final KafkaStoragePluginConfig config; + private final DrillbitContext context; + private final Closer closer = Closer.create(); + + public KafkaStoragePlugin(KafkaStoragePluginConfig config, DrillbitContext context, String name) + throws ExecutionSetupException { +logger.debug("Initializing {}", KafkaStoragePlugin.class.getName()); +this.config = config; +this.context = context; +this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name); + } + + public DrillbitContext getContext() { +return this.context; + } + + @Override + public KafkaStoragePluginConfig getConfig() { +return this.config; + } + + @Override + public boolean supportsRead() { +return Boolean.TRUE; --- End diff -- This is taken care > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258738#comment-16258738 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890096 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("kafka-partition-scan") +public class KafkaSubScan extends AbstractBase implements SubScan { + + @JsonProperty + private final KafkaStoragePluginConfig kafkStoragePluginConfig; + + @JsonIgnore + private final KafkaStoragePlugin kafkaStoragePlugin; + private final List coulmns; + private final List partitionSubScanSpecList; + + @JsonCreator + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, + @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig kafkStoragePluginConfig, + @JsonProperty("coulmns") List coulmns, + @JsonProperty("partitionSubScanSpecList") LinkedList partitionSubScanSpecList) --- End diff -- This is taken care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258736#comment-16258736 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890085 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("kafka-partition-scan") +public class KafkaSubScan extends AbstractBase implements SubScan { + + @JsonProperty + private final KafkaStoragePluginConfig kafkStoragePluginConfig; + + @JsonIgnore + private final KafkaStoragePlugin kafkaStoragePlugin; + private final List coulmns; + private final List partitionSubScanSpecList; + + @JsonCreator + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, + @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig kafkStoragePluginConfig, + @JsonProperty("coulmns") List coulmns, + @JsonProperty("partitionSubScanSpecList") LinkedList partitionSubScanSpecList) + throws ExecutionSetupException { +super(userName); +this.kafkStoragePluginConfig = kafkStoragePluginConfig; +this.coulmns = coulmns; +this.partitionSubScanSpecList = partitionSubScanSpecList; +this.kafkaStoragePlugin = (KafkaStoragePlugin) registry.getPlugin(kafkStoragePluginConfig); + } + + public KafkaSubScan(String userName, KafkaStoragePlugin plugin, KafkaStoragePluginConfig kafkStoragePluginConfig, + List coulmns, List partitionSubScanSpecList) { +super(userName); +this.coulmns = coulmns; +this.kafkStoragePluginConfig = kafkStoragePluginConfig; --- End diff -- This is taken care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258744#comment-16258744 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890105 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("kafka-partition-scan") +public class KafkaSubScan extends AbstractBase implements SubScan { + + @JsonProperty + private final KafkaStoragePluginConfig kafkStoragePluginConfig; + + @JsonIgnore + private final KafkaStoragePlugin kafkaStoragePlugin; + private final List coulmns; + private final List partitionSubScanSpecList; + + @JsonCreator + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, + @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig kafkStoragePluginConfig, --- End diff -- This is taken care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258735#comment-16258735 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890052 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.decoders; + +import org.apache.drill.common.exceptions.UserException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class MessageReaderFactory { + + private static final Logger logger = LoggerFactory.getLogger(MessageReaderFactory.class); + + /** + * Initialize kafka message reader beased on store.kafka.record.reader session + * property + * + * @param messageReaderKlass + * value of store.kafka.record.reader session property + * @return kafka message reader + * @throws UserException + * in case of any message reader initialization + */ + public static MessageReader getMessageReader(String messageReaderKlass) { +Preconditions.checkNotNull(messageReaderKlass, "Please set store.kafka.record.reader " + messageReaderKlass); +MessageReader messageReader = null; +try { + Class klass = Class.forName(messageReaderKlass); --- End diff -- As per your suggestion raised a JIRA for this https://issues.apache.org/jira/browse/DRILL-5976 > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258745#comment-16258745 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890077 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("kafka-partition-scan") +public class KafkaSubScan extends AbstractBase implements SubScan { + + @JsonProperty + private final KafkaStoragePluginConfig kafkStoragePluginConfig; + + @JsonIgnore + private final KafkaStoragePlugin kafkaStoragePlugin; + private final List coulmns; + private final List partitionSubScanSpecList; + + @JsonCreator + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, + @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig kafkStoragePluginConfig, + @JsonProperty("coulmns") List coulmns, + @JsonProperty("partitionSubScanSpecList") LinkedList partitionSubScanSpecList) + throws ExecutionSetupException { +super(userName); +this.kafkStoragePluginConfig = kafkStoragePluginConfig; +this.coulmns = coulmns; +this.partitionSubScanSpecList = partitionSubScanSpecList; +this.kafkaStoragePlugin = (KafkaStoragePlugin) registry.getPlugin(kafkStoragePluginConfig); + } + + public KafkaSubScan(String userName, KafkaStoragePlugin plugin, KafkaStoragePluginConfig kafkStoragePluginConfig, + List coulmns, List partitionSubScanSpecList) { +super(userName); +this.coulmns = coulmns; +this.kafkStoragePluginConfig = kafkStoragePluginConfig; +this.kafkaStoragePlugin = plugin; +this.partitionSubScanSpecList = partitionSubScanSpecList; + } + + @Override + publicT accept(PhysicalVisitor physicalVisitor, X value) throws E { +return physicalVisitor.visitSubScan(this, value); + } + + @Override + public PhysicalOperator getNewWithChildren(List children) throws ExecutionSetupException { +Preconditions.checkArgument(children.isEmpty()); +return new KafkaSubScan(getUserName(), kafkaStoragePlugin, kafkStoragePluginConfig, coulmns, +partitionSubScanSpecList); + } + + @Override + public Iterator iterator() { +return Collections.emptyIterator(); + } + + @JsonIgnore + public KafkaStoragePluginConfig getKafkStoragePluginConfig() { +return kafkStoragePluginConfig; + } + + @JsonIgnore + public KafkaStoragePlugin getKafkaStoragePlugin() { +return kafkaStoragePlugin; + } + + public List getCoulmns() { +return coulmns; --- End diff -- This is taken care. > Kafka storage plugin
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258734#comment-16258734 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151889218 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java --- @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.decoders; + +import org.apache.drill.common.exceptions.UserException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; + +public class MessageReaderFactory { + + private static final Logger logger = LoggerFactory.getLogger(MessageReaderFactory.class); + + /** + * Initialize kafka message reader beased on store.kafka.record.reader session + * property + * + * @param messageReaderKlass + * value of store.kafka.record.reader session property + * @return kafka message reader + * @throws UserException + * in case of any message reader initialization + */ + public static MessageReader getMessageReader(String messageReaderKlass) { +Preconditions.checkNotNull(messageReaderKlass, "Please set store.kafka.record.reader " + messageReaderKlass); +MessageReader messageReader = null; +try { + Class klass = Class.forName(messageReaderKlass); --- End diff -- Thanks for this suggestion Paul. Initially we have consider this as Plugin config. But, in Kafka most of the times users might need to implement their own custom MessageReader implementation. For example, Kafka messages can be encrypted. In other frameworks like Spark streaming or Storm or Camus, user will provide Deserializer/Decoder. As you suggested, created a separate JIRA https://issues.apache.org/jira/browse/DRILL-5976 for this. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258746#comment-16258746 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890205 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java --- @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import static org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT; + +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec; +import org.apache.drill.exec.store.kafka.decoders.MessageReader; +import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory; +import org.apache.drill.exec.util.Utilities; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +public class KafkaRecordReader extends AbstractRecordReader { + private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class); + public static final long DEFAULT_MESSAGES_PER_BATCH = 4000; + + private VectorContainerWriter writer; + private MessageReader messageReader; + + private boolean unionEnabled; + private KafkaConsumerkafkaConsumer; + private KafkaStoragePlugin plugin; --- End diff -- This is taken care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258739#comment-16258739 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890115 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Map; +import java.util.Properties; + +import org.apache.drill.common.logical.StoragePluginConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName(KafkaStoragePluginConfig.NAME) +public class KafkaStoragePluginConfig extends StoragePluginConfig { + + private static final Logger logger = LoggerFactory.getLogger(KafkaStoragePluginConfig.class); + public static final String NAME = "kafka"; + private Properties kafkaConsumerProps; + + @JsonCreator + public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") MapkafkaConsumerProps) { +this.kafkaConsumerProps = new Properties(); +this.kafkaConsumerProps.putAll(kafkaConsumerProps); +logger.debug("Kafka Consumer Props {}", this.kafkaConsumerProps); + } + + public Properties getKafkaConsumerProps() { +return kafkaConsumerProps; + } + + @Override + public int hashCode() { +final int prime = 31; +int result = 1; +result = prime * result + ((kafkaConsumerProps == null) ? 0 : kafkaConsumerProps.hashCode()); +return result; + } + + @Override + public boolean equals(Object obj) { +if (this == obj) { + return true; +} +if (obj == null) { + return false; +} +if (getClass() != obj.getClass()) { + return false; +} +KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj; +if (kafkaConsumerProps == null) { + if (other.kafkaConsumerProps != null) { +return false; + } +} else if (!kafkaConsumerProps.equals(other.kafkaConsumerProps)) { + return false; +} --- End diff -- This is take care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258743#comment-16258743 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890087 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("kafka-partition-scan") +public class KafkaSubScan extends AbstractBase implements SubScan { + + @JsonProperty + private final KafkaStoragePluginConfig kafkStoragePluginConfig; + + @JsonIgnore + private final KafkaStoragePlugin kafkaStoragePlugin; + private final List coulmns; + private final List partitionSubScanSpecList; + + @JsonCreator + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, + @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig kafkStoragePluginConfig, + @JsonProperty("coulmns") List coulmns, + @JsonProperty("partitionSubScanSpecList") LinkedList partitionSubScanSpecList) + throws ExecutionSetupException { +super(userName); +this.kafkStoragePluginConfig = kafkStoragePluginConfig; +this.coulmns = coulmns; +this.partitionSubScanSpecList = partitionSubScanSpecList; +this.kafkaStoragePlugin = (KafkaStoragePlugin) registry.getPlugin(kafkStoragePluginConfig); + } + + public KafkaSubScan(String userName, KafkaStoragePlugin plugin, KafkaStoragePluginConfig kafkStoragePluginConfig, + List coulmns, List partitionSubScanSpecList) { +super(userName); +this.coulmns = coulmns; --- End diff -- This is taken care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-4779) Kafka storage plugin support
[ https://issues.apache.org/jira/browse/DRILL-4779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258742#comment-16258742 ] ASF GitHub Bot commented on DRILL-4779: --- Github user akumarb2010 commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r151890101 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.physical.base.SubScan; +import org.apache.drill.exec.store.StoragePluginRegistry; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; + +@JsonTypeName("kafka-partition-scan") +public class KafkaSubScan extends AbstractBase implements SubScan { + + @JsonProperty + private final KafkaStoragePluginConfig kafkStoragePluginConfig; + + @JsonIgnore + private final KafkaStoragePlugin kafkaStoragePlugin; + private final List coulmns; + private final List partitionSubScanSpecList; + + @JsonCreator + public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, @JsonProperty("userName") String userName, + @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig kafkStoragePluginConfig, + @JsonProperty("coulmns") List coulmns, --- End diff -- This is taken care. > Kafka storage plugin support > > > Key: DRILL-4779 > URL: https://issues.apache.org/jira/browse/DRILL-4779 > Project: Apache Drill > Issue Type: New Feature > Components: Storage - Other >Affects Versions: 1.11.0 >Reporter: B Anil Kumar >Assignee: B Anil Kumar > Labels: doc-impacting > Fix For: 1.12.0 > > > Implement Kafka storage plugin will enable the strong SQL support for Kafka. > Initially implementation can target for supporting json and avro message types -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (DRILL-5977) predicate pushdown support kafkaMsgOffset
B Anil Kumar created DRILL-5977: --- Summary: predicate pushdown support kafkaMsgOffset Key: DRILL-5977 URL: https://issues.apache.org/jira/browse/DRILL-5977 Project: Apache Drill Issue Type: Improvement Reporter: B Anil Kumar As part of Kafka storage plugin review, below is the suggestion from Paul. {noformat} Does it make sense to provide a way to select a range of messages: a starting point or a count? Perhaps I want to run my query every five minutes, scanning only those messages since the previous scan. Or, I want to limit my take to, say, the next 1000 messages. Could we use a pseudo-column such as "kafkaMsgOffset" for that purpose? Maybe SELECT * FROM WHERE kafkaMsgOffset > 12345 {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (DRILL-5976) Kafka MessageReader config optimization
B Anil Kumar created DRILL-5976: --- Summary: Kafka MessageReader config optimization Key: DRILL-5976 URL: https://issues.apache.org/jira/browse/DRILL-5976 Project: Apache Drill Issue Type: Improvement Reporter: B Anil Kumar Assignee: B Anil Kumar Kafka storage plugin currently supports JSON message format, but going forward it will support Avro etc. Also there might be multiple scenarios where user have to implement their own MessageReader (Corresponding their custom Deserializer/Decoder). So, this JIRA is to brainstorm on whether to go with MessageReader as a *Plugin config* OR system/session option. Paul's suggestion as part review comment is as below. {noformat} Suppose this is two or three releases from now and we support other forms of Kafka messages. Different topics use different formats. If the message format is a system/session option, then I need to switch the option before each query. Very cumbersome and error prone. Instead, perhaps this information should be part of the storage plugin config. Then, I can define different plugins: one for each message format. Further, can I have multiple Kafka servers? If so, would I need different plugin configs for each? So, should we be thinking about encoding most properties as plugin config properties? Now, the plugin might have a format property, one of which is json. The JSON config properties would be defined in the json format within the overall storage plugin config. {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5975) Resource utilization
[ https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258585#comment-16258585 ] Paul Rogers commented on DRILL-5975: The design proposed may work for simple queries, but it is not sufficiently general for large queries. Let's start with some background. Here is Drill's current design. As you point out, there are many opportunities for improvement. But, this is what we have today. This is an important topic, so I've taken the liberty of explaining our current understanding in some detail. h4. Current Model Drill's current resource model is based on symmetry: it assumes queries arrive randomly at each Drillbit. That is, that each user connects to a random Foreman, that each user has similar load, and that the number of users is larger than the number of Drillbits, so that each Foreman sees roughly equal load. Next, Drill assumes that users want maximum performance of each query. Therefore, it strives for maximum parallelization across both the CPUs in each node and all nodes in the cluster. To do this, Drill assumes nodes are symmetrical: that all have the same number of available CPUs and memory. Drill also assumes that Drill is the cluster is dedicated to Drill and so Drill attempts to saturate all CPUs on all hosts. Next, Drill assumes that all queries are fast, and so that query 1 will complete roughly before query 2 starts. This means that resource sharing is sequential: query 1 uses all memory and CPUs, then query 2 does so. The above model may not be ideal, but it is the "simplest thing that could work." It has gotten Drill this far, but it does clearly have limits. h4. Limitations of the Current Model Problems occur, obviously, when the real world does not agree with Drill's assumptions. Typical problems: * Drill does not "own" its cluster, and instead shared resources (such as CPU) with other workloads. * Queries are not short, and instead of running sequentially, they end up running in parallel. * Queries need more resources (CPU or memory) than the user has provided. h4. CPU Overload While our recent work focused on memory (with spilling and queue-based memory allocation), you correctly point out that we need to turn our attention to CPU. Let's look just a bit deep at the cause of CPU usage. Drill makes two assumptions: * Create as many "major fragments" (work units) as possible for each query. (Shown as different colors in the query plan visualization UI.) * Create as many "minor fragments" (slices) as possible for each major fragment. (Shown as numbered items in the query plan UI tables.) * By default, create a number of minor fragments equal to 70% of the number of CPUs. (If your machine has 20 cores, Drill will create 14 slices per major fragment.) * Every minor fragment is implemented as a Java thread. A bit of math shows why this is a problem. Assume a typical query with, say, 5 major fragments. Assume 24 CPUs. We will create a number of threads equal to: 24 CPUs * 70% * 5 major fragments = 87 threads If all these threads were busy at the same time, we'd overload our CPUs by a factor of 3, causing intense context switching (which pretty much defeats any attempts to optimize for internal CPU caches.) Larger queries can easily oversubscribe CPUs by 10 or more times. Note that each of these threads wants to make use of unlimited memory. It does not take too many such queries before Drill thrashes the CPU and runs out of memory. The lesson is that the workload exceeds the available resources, then the "assume infinite resources" model no longer works. Some form of throttling is needed. Let's discuss that. Suppose that Drill can make a query faster by using all CPUs. Then, there is, by design, no room in the cluster for another query. If we are already using 300% of CPU, then adding another query simply causes more thrashing, puts more pressure on memory, and causes both queries to slow down. In an ideal case, both queries will take twice as long as if they ran separately. In extreme cases, the slow-down is sub-linear once the OS starts wasting time thrashing threads (as shown by the percent system time in, say, the "top" command.) In this (simplified) model, we are better off running one query to completion, then starting the second. Both make full use of the cluster. Total run time is the same. Plus, memory pressure is halved. In general, some queries need all resources, but many do not. In our own testing, we see that, with TPC-H queries, there is some advantage to running parallel queries up to maybe three or four concurrent queries. After that, we just see a linear slow-down. (We've pushed the system to 20, 30 or more queries -- it is like your local freeway at rush hour; everything becomes vey slow.) h4. Throttling The first challenge is to accept that every cluster has limits. Once Drill saturates CPUs,
[jira] [Commented] (DRILL-5261) Expose REST endpoint in zookeeper
[ https://issues.apache.org/jira/browse/DRILL-5261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258553#comment-16258553 ] ASF GitHub Bot commented on DRILL-5261: --- Github user xhochy commented on the issue: https://github.com/apache/drill/pull/1042 This still needs the protobufs to be re-compiled for C++, I would do this once someone signals me that I have taken the correct approach. > Expose REST endpoint in zookeeper > - > > Key: DRILL-5261 > URL: https://issues.apache.org/jira/browse/DRILL-5261 > Project: Apache Drill > Issue Type: New Feature >Reporter: Uwe L. Korn > > It would be nice to also publish the REST API endpoint of each Drillbit in > the Zookeeper. This would mean that we need an additional entry in > {{DrillbitEndpoint}}. While I would know how to add the attribute to the > ProtoBuf definition and filling the attribute with the correct information, > I'm unsure if there is the need for some migration code to support older > {{DrillbitEndpoint}} implementations that don't have this attribute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5261) Expose REST endpoint in zookeeper
[ https://issues.apache.org/jira/browse/DRILL-5261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258552#comment-16258552 ] ASF GitHub Bot commented on DRILL-5261: --- GitHub user xhochy opened a pull request: https://github.com/apache/drill/pull/1042 DRILL-5261: Expose REST endpoint in zookeeper You can merge this pull request into a Git repository by running: $ git pull https://github.com/xhochy/drill DRILL-5261 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1042.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1042 commit 2df606590158afe6cb29e9f292eb33e945c26ba0 Author: Uwe L. KornDate: 2017-11-19T09:36:32Z DRILL-5261: Expose REST endpoint in zookeeper > Expose REST endpoint in zookeeper > - > > Key: DRILL-5261 > URL: https://issues.apache.org/jira/browse/DRILL-5261 > Project: Apache Drill > Issue Type: New Feature >Reporter: Uwe L. Korn > > It would be nice to also publish the REST API endpoint of each Drillbit in > the Zookeeper. This would mean that we need an additional entry in > {{DrillbitEndpoint}}. While I would know how to add the attribute to the > ProtoBuf definition and filling the attribute with the correct information, > I'm unsure if there is the need for some migration code to support older > {{DrillbitEndpoint}} implementations that don't have this attribute. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (DRILL-5974) Read JSON non-relational fields using text mode
[ https://issues.apache.org/jira/browse/DRILL-5974?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258550#comment-16258550 ] Aman Sinha commented on DRILL-5974: --- For the specific example of the 2-D array, Drill currently can query it using the array [i] [j] syntax, as shown below (I took this example from the Drill docs page [1]): {noformat} SELECT features[0].geometry.coordinates[0][1] from dfs.`/Users/asinha/data/json/polygon`; +---+ |EXPR$0 | +---+ | [-122.42207601332528,37.808835019815085,0.0] | +---+ {noformat} If we treat it as text mode, my understanding is the above query will no longer work. Independent of this example, I do agree that there are other types of JSON structures that Drill cannot process without setting the all_text_mode, so those could benefit. For instance, a list with heterogenous types, list with NULL values. [1] https://drill.apache.org/docs/json-data-model/ > Read JSON non-relational fields using text mode > --- > > Key: DRILL-5974 > URL: https://issues.apache.org/jira/browse/DRILL-5974 > Project: Apache Drill > Issue Type: Improvement >Affects Versions: 1.13.0 >Reporter: Paul Rogers >Assignee: Paul Rogers > Fix For: 1.13.0 > > > Proposed is a minor enhancement to the JSON reader to better handle > non-relational JSON structures. > As background, Drill handles simple tuples: > {code} > {a: 10, b: “fred”} > {code} > Drill also handles arrays: > {code} > {name: “fred”, hobbies: [“bowling”, “golf”]} > {code} > Drill even handles arrays of tuples: > {code} > {name: “fred”, orders: [ > {id: 1001, amount: 12.34}, > {id: 1002, amount: 56.78}]} > {code} > The above are termed "relational" because there is a straightforward mapping > to/from tables into the above JSON structures. > Things get interesting with non-relational types, such as 2-D arrays: > {code} > {id: 4, shape: “square”, points: [[0, 0], [0, 5], [5, 0], [5, 5]]} > {code} > Drill has two solutions: > * Turn on the experimental list and union support. > * Enable all-text mode to read all fields as JSON text. > Proposed is a middle ground: > * Read fields with relational types into vectors. > * Read non-relational fields using text mode. > Thus, the first three examples would all result in the JSON data parsed into > Drill vectors. But, the fourth, non-relational example would produce a row > that looks like this: > {noformat} > id, shape, points > 4, “shape”, “[[0, 0], [0, 5], [5, 0], [5, 5]]” > {noformat} > Although Drill can’t parse the 2-D array, Drill will pass the array along to > the client, which can use its favorite JSON parser to parse the array and do > something useful (like draw the square in this case.) > Specifically, the proposal is to: > * Apply this change only to the revised “batch size aware” JSON reader. > * Use the above parsing model by default. > * Use the experimental list-and-union support if the existing > {{exec.enable_union_type}} system/session option is set. > Existing queries should “just work.” In fact, now JSON with non-relational > types will work “out-of-the-box” without all-text mode or the experimental > types. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (DRILL-5975) Resource utilization
[ https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448 ] weijie.tong edited comment on DRILL-5975 at 11/19/17 12:45 PM: --- Yes , we already used the queue option for a long time. It's good to prevent the cluster from being overloaded, but too coarse as being a scheduler. I have noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's a good design to the memory allocation. But I think it will give little help to prevent a Drillbit from being assigned too much work at current architecture. We need a scheduler to do the fragment level scheduling work ,call it first level schedule. The YARN like model schedule works at the Drillbit node level ,call it second level schedule. First level schedule can work upon the second level schedule, says when we deploy the Drill on the Yarn. I propose this design as I have investigated the Flink and Spark projects. I prefer Flink's [design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks] . I will look into Presto as your advice. I also have discussed with [~jni] privately. We have some common opinion at some point. I think it's necessary to introduce the RecordBatchManager role to break the current RPC cascaded dependence between two MajorFragments at the data exchange stage.If the memory is enough or the upper MajorFragment's calculation is fast enough, the written RecordBatchs will be pushed into the consumers quickly, no chance to go to disk, behaves the same performance as current implementation. It will also let MinorFragment level schedule become possible. Welcome to discuss. was (Author: weijie): Yes , we already used the queue option for a long time. It's good to prevent the cluster from being overloaded, but too coarse as being a scheduler. I have noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's a good design to the memory allocation. But I think it will give little help to prevent a Drillbit from being assigned too much work at current architecture. We need a scheduler to do the fragment level scheduling work ,call it first level schedule. The YARN like model schedule works at the Drillbit node level ,call it second level schedule. First level schedule can work upon the second level schedule, says when we deploy the Drill on the Yarn. I propose this design by having investigated the Flink and Spark projects. I prefer Flink's [design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks] . I will look into Presto as your advice. I also have discussed with [~jni] privately. We have some common opinion at some point. I think it's necessary to introduce the RecordBatchManager role to break the current RPC cascaded dependence between two MajorFragments at the data exchange stage.If the memory is enough or the upper MajorFragment's calculation is fast enough, the written RecordBatchs will be pushed into the consumers quickly, no chance to go to disk, behaves the same performance as current implementation. It will also let MinorFragment level schedule become possible. Welcome to discuss. > Resource utilization > > > Key: DRILL-5975 > URL: https://issues.apache.org/jira/browse/DRILL-5975 > Project: Apache Drill > Issue Type: New Feature >Affects Versions: 2.0.0 >Reporter: weijie.tong >Assignee: weijie.tong > > h1. Motivation > Now the resource utilization radio of Drill's cluster is not too good. Most > of the cluster resource is wasted. We can not afford too much concurrent > queries. Once the system accepted more queries with a not high cpu load, the > query which originally is very quick will become slower and slower. > The reason is Drill does not supply a scheduler . It just assume all the > nodes have enough calculation resource. Once a query comes, it will schedule > the related fragments to random nodes not caring about the node's load. Some > nodes will suffer more cpu context switch to satisfy the coming query. The > profound causes to this is that the runtime minor fragments construct a > runtime tree whose nodes spread different drillbits. The runtime tree is a > memory pipeline that is all the nodes will stay alone the whole lifecycle of > a query by sending out data to upper nodes successively, even though some > node could run quickly and quit immediately.What's more the runtime tree is > constructed before actual running. The schedule target to Drill will become > the whole runtime tree nodes. > h1. Design > It will be hard to schedule the runtime tree nodes as a whole. So I try to > solve this by breaking the runtime cascade nodes. The graph below describes > the initial design. >
[jira] [Comment Edited] (DRILL-5975) Resource utilization
[ https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448 ] weijie.tong edited comment on DRILL-5975 at 11/19/17 11:16 AM: --- Yes , we already used the queue option for a long time. It's good to prevent the cluster from being overloaded, but too coarse as being a scheduler. I have noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's a good design to the memory allocation. But I think it will give little help to prevent a Drillbit from being assigned too much work at current architecture. We need a scheduler to do the fragment level scheduling work ,call it first level schedule. The YARN like model schedule works at the Drillbit node level ,call it second level schedule. First level schedule can work upon the second level schedule, says when we deploy the Drill on the Yarn. I propose this design by having investigated the Flink and Spark projects. I prefer Flink's [design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks] . I will look into Presto as your advice. I also have discussed with [~jni] privately. We have some common opinion at some point. I think it's necessary to introduce the RecordBatchManager role to break the current RPC cascaded dependence between two MajorFragments at the data exchange stage.If the memory is enough or the upper MajorFragment's calculation is fast enough, the written RecordBatchs will be pushed into the consumers quickly, no chance to go to disk, behaves the same performance as current implementation. It will also let MinorFragment level schedule become possible. Welcome to discuss. was (Author: weijie): Yes , we already used the queue option for a long time. It's good to prevent the cluster from being overloaded, but too coarse as being a scheduler. I have noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's a good design to the memory allocation. But I think it will give little help to prevent a Drillbit from being assigned too much work at current architecture. We need a scheduler to do the fragment level scheduling work ,call it first level schedule. The YARN like model schedule works at the Drillbit node level ,call it second level schedule. First level schedule can work upon the second level schedule, says when we deploy the Drill on the Yarn. I propose this design by having investigated the Flink and Spark projects. I prefer Flink's design [design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks] . I will look into Presto as your advice. I also have discussed with [~jni] privately. We have some common opinion at some point. I think it's necessary to introduce the RecordBatchManager role to break the current RPC cascaded dependence between two MajorFragments at the data exchange stage.If the memory is enough or the upper MajorFragment's calculation is fast enough, the written RecordBatchs will be pushed into the consumers quickly, no chance to go to disk, behaves the same performance as current implementation. It will also let MinorFragment level schedule become possible. Welcome to discuss. > Resource utilization > > > Key: DRILL-5975 > URL: https://issues.apache.org/jira/browse/DRILL-5975 > Project: Apache Drill > Issue Type: New Feature >Affects Versions: 2.0.0 >Reporter: weijie.tong >Assignee: weijie.tong > > h1. Motivation > Now the resource utilization radio of Drill's cluster is not too good. Most > of the cluster resource is wasted. We can not afford too much concurrent > queries. Once the system accepted more queries with a not high cpu load, the > query which originally is very quick will become slower and slower. > The reason is Drill does not supply a scheduler . It just assume all the > nodes have enough calculation resource. Once a query comes, it will schedule > the related fragments to random nodes not caring about the node's load. Some > nodes will suffer more cpu context switch to satisfy the coming query. The > profound causes to this is that the runtime minor fragments construct a > runtime tree whose nodes spread different drillbits. The runtime tree is a > memory pipeline that is all the nodes will stay alone the whole lifecycle of > a query by sending out data to upper nodes successively, even though some > node could run quickly and quit immediately.What's more the runtime tree is > constructed before actual running. The schedule target to Drill will become > the whole runtime tree nodes. > h1. Design > It will be hard to schedule the runtime tree nodes as a whole. So I try to > solve this by breaking the runtime cascade nodes. The graph below describes > the initial design. >
[jira] [Comment Edited] (DRILL-5975) Resource utilization
[ https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448 ] weijie.tong edited comment on DRILL-5975 at 11/19/17 11:15 AM: --- Yes , we already used the queue option for a long time. It's good to prevent the cluster from being overloaded, but too coarse as being a scheduler. I have noticed the [DRILL-5716|https://issues.apache.org/jira/browse/DRILL-5716]. It's a good design to the memory allocation. But I think it will give little help to prevent a Drillbit from being assigned too much work at current architecture. We need a scheduler to do the fragment level scheduling work ,call it first level schedule. The YARN like model schedule works at the Drillbit node level ,call it second level schedule. First level schedule can work upon the second level schedule, says when we deploy the Drill on the Yarn. I propose this design by having investigated the Flink and Spark projects. I prefer Flink's design [design|https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks] . I will look into Presto as your advice. I also have discussed with [~jni] privately. We have some common opinion at some point. I think it's necessary to introduce the RecordBatchManager role to break the current RPC cascaded dependence between two MajorFragments at the data exchange stage.If the memory is enough or the upper MajorFragment's calculation is fast enough, the written RecordBatchs will be pushed into the consumers quickly, no chance to go to disk, behaves the same performance as current implementation. It will also let MinorFragment level schedule become possible. Welcome to discuss. was (Author: weijie): Yes , we already used the queue option for a long time. It's good to prevent the cluster from being overloaded, but too coarse as being a scheduler. I have noticed the [#https://issues.apache.org/jira/browse/DRILL-5716]. It's a good design to the memory allocation. But I think it will give little help to prevent a Drillbit from being assigned too much work at current architecture. We need a scheduler to do the fragment level scheduling work ,call it first level schedule. The YARN like model schedule works at the Drillbit node level ,call it second level schedule. First level schedule can work upon the second level schedule, says when we deploy the Drill on the Yarn. I propose this design by having investigated the Flink and Spark projects. I prefer Flink's design [#https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks] . I will look into Presto as your advice. I also have discussed with [~jni] privately. We have some common opinion at some point. I think it's necessary to introduce the RecordBatchManager role to break the current RPC cascaded dependence between two MajorFragments at the data exchange stage.If the memory is enough or the upper MajorFragment's calculation is fast enough, the written RecordBatchs will be pushed into the consumers quickly, no chance to go to disk, behaves the same performance as current implementation. It will also let MinorFragment level schedule become possible. Welcome to discuss. > Resource utilization > > > Key: DRILL-5975 > URL: https://issues.apache.org/jira/browse/DRILL-5975 > Project: Apache Drill > Issue Type: New Feature >Affects Versions: 2.0.0 >Reporter: weijie.tong >Assignee: weijie.tong > > h1. Motivation > Now the resource utilization radio of Drill's cluster is not too good. Most > of the cluster resource is wasted. We can not afford too much concurrent > queries. Once the system accepted more queries with a not high cpu load, the > query which originally is very quick will become slower and slower. > The reason is Drill does not supply a scheduler . It just assume all the > nodes have enough calculation resource. Once a query comes, it will schedule > the related fragments to random nodes not caring about the node's load. Some > nodes will suffer more cpu context switch to satisfy the coming query. The > profound causes to this is that the runtime minor fragments construct a > runtime tree whose nodes spread different drillbits. The runtime tree is a > memory pipeline that is all the nodes will stay alone the whole lifecycle of > a query by sending out data to upper nodes successively, even though some > node could run quickly and quit immediately.What's more the runtime tree is > constructed before actual running. The schedule target to Drill will become > the whole runtime tree nodes. > h1. Design > It will be hard to schedule the runtime tree nodes as a whole. So I try to > solve this by breaking the runtime cascade nodes. The graph below describes > the initial design. >
[jira] [Commented] (DRILL-5975) Resource utilization
[ https://issues.apache.org/jira/browse/DRILL-5975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16258448#comment-16258448 ] weijie.tong commented on DRILL-5975: Yes , we already used the queue option for a long time. It's good to prevent the cluster from being overloaded, but too coarse as being a scheduler. I have noticed the [#https://issues.apache.org/jira/browse/DRILL-5716]. It's a good design to the memory allocation. But I think it will give little help to prevent a Drillbit from being assigned too much work at current architecture. We need a scheduler to do the fragment level scheduling work ,call it first level schedule. The YARN like model schedule works at the Drillbit node level ,call it second level schedule. First level schedule can work upon the second level schedule, says when we deploy the Drill on the Yarn. I propose this design by having investigated the Flink and Spark projects. I prefer Flink's design [#https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks] . I will look into Presto as your advice. I also have discussed with [~jni] privately. We have some common opinion at some point. I think it's necessary to introduce the RecordBatchManager role to break the current RPC cascaded dependence between two MajorFragments at the data exchange stage.If the memory is enough or the upper MajorFragment's calculation is fast enough, the written RecordBatchs will be pushed into the consumers quickly, no chance to go to disk, behaves the same performance as current implementation. It will also let MinorFragment level schedule become possible. Welcome to discuss. > Resource utilization > > > Key: DRILL-5975 > URL: https://issues.apache.org/jira/browse/DRILL-5975 > Project: Apache Drill > Issue Type: New Feature >Affects Versions: 2.0.0 >Reporter: weijie.tong >Assignee: weijie.tong > > h1. Motivation > Now the resource utilization radio of Drill's cluster is not too good. Most > of the cluster resource is wasted. We can not afford too much concurrent > queries. Once the system accepted more queries with a not high cpu load, the > query which originally is very quick will become slower and slower. > The reason is Drill does not supply a scheduler . It just assume all the > nodes have enough calculation resource. Once a query comes, it will schedule > the related fragments to random nodes not caring about the node's load. Some > nodes will suffer more cpu context switch to satisfy the coming query. The > profound causes to this is that the runtime minor fragments construct a > runtime tree whose nodes spread different drillbits. The runtime tree is a > memory pipeline that is all the nodes will stay alone the whole lifecycle of > a query by sending out data to upper nodes successively, even though some > node could run quickly and quit immediately.What's more the runtime tree is > constructed before actual running. The schedule target to Drill will become > the whole runtime tree nodes. > h1. Design > It will be hard to schedule the runtime tree nodes as a whole. So I try to > solve this by breaking the runtime cascade nodes. The graph below describes > the initial design. > !https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png! > [graph > link|https://raw.githubusercontent.com/wiki/weijietong/drill/images/design.png] > Every Drillbit instance will have a RecordBatchManager which will accept all > the RecordBatchs written by the senders of local different MinorFragments. > The RecordBatchManager will hold the RecordBatchs in memory firstly then disk > storage . Once the first RecordBatch of a MinorFragment sender of one query > occurs , it will notice the FragmentScheduler. The FragmentScheduler is > instanced by the Foreman.It holds the whole PlanFragment execution graph.It > will allocate a new corresponding FragmentExecutor to run the generated > RecordBatch. The allocated FragmentExecutor will then notify the > corresponding FragmentManager to indicate that I am ready to receive the > data. Then the FragmentManger will send out the RecordBatch one by one to the > corresponding FragmentExecutor's receiver like what the current Sender does > by throttling the data stream. > What we can gain from this design is : > a. The computation leaf node does not to wait for the consumer's speed to end > its life to release the resource. > b. The sending data logic will be isolated from the computation nodes and > shared by different FragmentManagers. > c. We can schedule the MajorFragments according to Drillbit's actual resource > capacity at runtime. > d. Drill's pipeline data processing characteristic is also retained. > h1. Plan > This will be a large PR ,so I plan to divide it into some small ones. > a. to implement the