[GitHub] drill issue #914: DRILL-5657: Size-aware vector writer structure

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on the issue:

https://github.com/apache/drill/pull/914
  
Regarding the use of memory addresses. The only reason to do so is 
performance. To show the benefit of using addresses, I reran the 
`PerformanceTool` class to test the original code, the code using addresses, 
and a version that uses DrillBuf as @parthchandra suggested. I expected to see 
that using addresses was a winner. That's not at all what happened.

The code contains a class, `PerformanceTool` that compares the column 
writers with the original vector mutators. It loads a vector to 16 MB in size, 
repeated 300 times. The following are the run times, in ms.

Vector Type | Original  | New w/Address | New w/Drillbuf
 |   |   | - 
Required | 5703 | 4034 | 1461
Nullable | 12743 | 3645 | 3411
Repeated | 20430 | 7226 | 2669

Here:

* "Original" column uses the original int vector mutator class.
* "New w/Address" shows the same exercise, using the version of the vector 
writers based on a direct memory address.
* "New w/Drillbuf" shows the vector writers, but using the technique Parth 
suggested to create "unsafe" methods on the `Drillbuf` class.

The test is run with a pre-allocated vector (no double-and-copy 
operations). See `PerformanceTool` for details.

I have no explanation for why the `Drillbuf` version should be faster at 
all, let alone far faster; but I'll take it. The latest commit contains the 
code after this revision.

So, thank you Parth, you were right again with what turned out to be an 
outstanding performance boost.


---


[GitHub] drill pull request #1033: DRILL-5952: Implement "CREATE TABLE IF NOT EXISTS"

2017-11-16 Thread prasadns14
Github user prasadns14 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1033#discussion_r151614301
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java ---
@@ -313,6 +313,59 @@ public void createTableWithCustomUmask() throws 
Exception {
 }
   }
 
+  @Test // DRILL-5951
+  public void 
testCreateTableIfNotExistsWhenTableWithSameNameAlreadyExists() throws Exception{
+final String newTblName = 
"createTableIfNotExistsWhenATableWithSameNameAlreadyExists";
+
+try {
+  test(String.format("CREATE TABLE %s.%s AS SELECT * from 
cp.`region.json`", TEMP_SCHEMA, newTblName));
+
+  final String ctasQuery =
+String.format("CREATE TABLE IF NOT EXISTS %s.%s AS SELECT * FROM 
cp.`employee.json`", TEMP_SCHEMA, newTblName);
+
+  testBuilder()
+.sqlQuery(ctasQuery)
+.unOrdered()
+.baselineColumns("ok", "summary")
+.baselineValues(false, String.format("A table or view with given 
name [%s] already exists in schema [%s]", newTblName, TEMP_SCHEMA))
+.go();
+} finally {
+  test(String.format("DROP TABLE %s.%s", TEMP_SCHEMA, newTblName));
+}
+  }
+
+  @Test // DRILL-5951
+  public void 
testCreateTableIfNotExistsWhenViewWithSameNameAlreadyExists() throws Exception{
+final String newTblName = 
"createTableIfNotExistsWhenAViewWithSameNameAlreadyExists";
+
+try {
+  test(String.format("CREATE VIEW %s.%s AS SELECT * from 
cp.`region.json`", TEMP_SCHEMA, newTblName));
+
+  final String ctasQuery =
+String.format("CREATE TABLE IF NOT EXISTS %s.%s AS SELECT * FROM 
cp.`employee.json`", TEMP_SCHEMA, newTblName);
+
+  testBuilder()
+.sqlQuery(ctasQuery)
+.unOrdered()
+.baselineColumns("ok", "summary")
+.baselineValues(false, String.format("A table or view with given 
name [%s] already exists in schema [%s]", newTblName, TEMP_SCHEMA))
+.go();
+} finally {
+  test(String.format("DROP VIEW %s.%s", TEMP_SCHEMA, newTblName));
+}
+  }
+
+  @Test // DRILL-5951
+  public void 
testCreateTableIfNotExistsWhenTableWithSameNameDoesNotExist() throws Exception{
+final String newTblName = 
"createTableIfNotExistsWhenATableWithSameNameDoesNotExist";
+
+try {
+  test(String.format("CREATE TABLE IF NOT EXISTS %s.%s AS SELECT * 
FROM cp.`employee.json`", TEMP_SCHEMA, newTblName));
--- End diff --

if successful returns fragment & number of rows written. as long as no 
exception is thrown we can be sure the table is created. in sync with other 
test cases in TestCTAS


---


[GitHub] drill pull request #1033: DRILL-5952: Implement "CREATE TABLE IF NOT EXISTS"

2017-11-16 Thread prasadns14
Github user prasadns14 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1033#discussion_r151613939
  
--- Diff: 
exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTAS.java ---
@@ -313,6 +313,59 @@ public void createTableWithCustomUmask() throws 
Exception {
 }
   }
 
+  @Test // DRILL-5951
+  public void 
testCreateTableIfNotExistsWhenTableWithSameNameAlreadyExists() throws Exception{
+final String newTblName = 
"createTableIfNotExistsWhenATableWithSameNameAlreadyExists";
+
+try {
+  test(String.format("CREATE TABLE %s.%s AS SELECT * from 
cp.`region.json`", TEMP_SCHEMA, newTblName));
--- End diff --

fixed it


---


[GitHub] drill pull request #1033: DRILL-5952: Implement "CREATE TABLE IF NOT EXISTS"

2017-11-16 Thread prasadns14
Github user prasadns14 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1033#discussion_r151613897
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
 ---
@@ -83,7 +84,20 @@ public PhysicalPlan getPlan(SqlNode sqlNode) throws 
ValidationException, RelConv
 final DrillConfig drillConfig = context.getConfig();
 final AbstractSchema drillSchema = resolveSchema(sqlCreateTable, 
config.getConverter().getDefaultSchema(), drillConfig);
 
-checkDuplicatedObjectExistence(drillSchema, originalTableName, 
drillConfig, context.getSession());
+String schemaPath = drillSchema.getFullSchemaName();
+
+// Check duplicate object existence
+boolean isTemporaryTable = 
context.getSession().isTemporaryTable(drillSchema, drillConfig, 
originalTableName);
--- End diff --

1) Added unit tests for temp tables
2) Implemented for views as well


---


[GitHub] drill issue #1001: JIRA DRILL-5879: Like operator performance improvements

2017-11-16 Thread ppadma
Github user ppadma commented on the issue:

https://github.com/apache/drill/pull/1001
  
@priteshm @sachouche This PR needs to be updated on top of changes made for 
DRILL-5899.


---


[GitHub] drill issue #1001: JIRA DRILL-5879: Like operator performance improvements

2017-11-16 Thread priteshm
Github user priteshm commented on the issue:

https://github.com/apache/drill/pull/1001
  
@ppadma @paul-rogers I see that @sachouche addressed the comments in the 
JIRA - is this one ready to merge?


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585383
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
--- End diff --

Spelling: `kafkStorage...` --> `kafkaStorage`


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587965
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageIteratorTest extends KafkaTestBase {
+
+  private KafkaConsumer kafkaConsumer;
+  private KafkaSubScanSpec subScanSpec;
+
+  @Before
+  public void setUp() {
+Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
+consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
+kafkaConsumer = new KafkaConsumer<>(consumerProps);
+subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
+  }
+
+  @After
+  public void cleanUp() {
+if (kafkaConsumer != null) {
+  kafkaConsumer.close();
+}
+  }
+
+  @Test
+  public void testWhenPollTimeOutIsTooLess() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, 1);
+try {
+  iterator.hasNext();
+} catch (UserException ue) {
+  Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType());
+  Assert.assertTrue(ue.getMessage().contains(
+  "DATA_READ ERROR: Failed to fetch messages within 1 
milliseconds. Consider increasing the value of the property : 
store.kafka.poll.timeout"));
+}
+  }
+
+  @Test
+  public void testShouldReturnTrueAsKafkaHasMessages() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, TimeUnit.SECONDS.toMillis(1));
+Assert.assertTrue("Message iterator returned false though there are 
messages in Kafka", iterator.hasNext());
+  }
+
+  @Test
+  public void testShouldReturnMessage1() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, TimeUnit.SECONDS.toMillis(1));
+// Calling hasNext makes only one poll to Kafka which fetches only 4 
messages.
+// so fifth operation on iterator is expected to fail.
+iterator.hasNext();
+Assert.assertNotNull(iterator.next());
+Assert.assertNotNull(iterator.next());
+Assert.assertNotNull(iterator.next());
+Assert.assertNotNull(iterator.next());
+try {
+  iterator.next();
+  Assert.fail("Kafak fetched more messages than configured.");
+} catch (NoSuchElementException nse) {
+  Assert.assertTrue(true);
--- End diff --

Should `next()` just return false at this point rather than throwing an 
exception? That is, should the iterator map the exception to the normal 
iterator interface of returning `false` at EOF?

Also, a typical Drill pattern for the catch block is just to say:
```
// Expected
```


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585461
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
--- End diff --

Spelling. Since this is a public API, might as well get it right.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151586477
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

Actually, seeing the larger picture, I wonder if this is the right 
approach. Suppose this is two or three releases from now and we support other 
forms of Kafka messages. Different topics use different formats.

If the message format is a system/session option, then I need to switch the 
option before each query. Very cumbersome and error prone.

Instead, perhaps this information should be part of the storage plugin 
config. Then, I can define different plugins: one for each message format.

Further, can I have multiple Kafka servers? If so, would I need different 
plugin configs for each?

So, should we be thinking about encoding most properties as plugin config 
properties?

Now, the plugin might have a `format` property, one of which is `json`. The 
JSON config properties would be defined in the `json` format within the overall 
storage plugin config.

Given the looming deadline of the 1.12 release, it is fine if we file the 
above suggestion as a JIRA and work on it for 1.13.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151582174
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.kafkaStoragePlugin = plugin;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List 
children) throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkStoragePluginConfig, coulmns,
+partitionSubScanSpecList);
+  }
+
+  @Override
+  public Iterator iterator() {
+return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkStoragePluginConfig() {
+return kafkStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+return kafkaStoragePlugin;
+  }
+
+  public List getCoulmns() {
+return coulmns;
+  }
+
+  public List getPartitionSubScanSpecList() {
+return partitionSubScanSpecList;
+  }
+
+  @Override
+  public int getOperatorType() {
+return 0;
--- End diff --

Should add an operator code in `UserBitShared.proto`, something like:

```

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151584935
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_MSG_KEY;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_OFFSET;
+import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_PARTITION_ID;
+import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TIMESTAMP;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TOPIC;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader class which will convert ConsumerRecord into JSON and 
writes to
+ * VectorContainerWriter of JsonReader
+ *
+ */
+public class JsonMessageReader implements MessageReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JsonMessageReader.class);
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  @Override
+  public void init(DrillBuf buf, List columns, 
VectorContainerWriter writer, boolean allTextMode,
+  boolean skipOuterList, boolean readNumbersAsDouble) {
+this.jsonReader = new JsonReader(buf, columns, allTextMode, 
skipOuterList, readNumbersAsDouble);
+this.writer = writer;
+  }
+
+  @Override
+  public void readMessage(ConsumerRecord record) {
+try {
+  byte[] recordArray = (byte[]) record.value();
+  JsonObject jsonObj = (new JsonParser()).parse(new 
String(recordArray, Charsets.UTF_8)).getAsJsonObject();
+  jsonObj.addProperty(KAFKA_TOPIC.getFieldName(), record.topic());
+  jsonObj.addProperty(KAFKA_PARTITION_ID.getFieldName(), 
record.partition());
+  jsonObj.addProperty(KAFKA_OFFSET.getFieldName(), record.offset());
+  jsonObj.addProperty(KAFKA_TIMESTAMP.getFieldName(), 
record.timestamp());
+  jsonObj.addProperty(KAFKA_MSG_KEY.getFieldName(), record.key() != 
null ? record.key().toString() : null);
+  jsonReader.setSource(jsonObj.toString().getBytes(Charsets.UTF_8));
--- End diff --

Actually, no need for all this complexity. This code:

* Gets UTF-8 bytes encoding JSON
* Converts that to a string
* Parses that into a JSON object
* Serializes that into a string
* Converts that to a byte array.
* The "JSON reader" parses the bytes back into JSON tokens.

Any reason we can't just pass in the byte array directly? Another 
alternative is to create an `InputStream` that reads the byte array. The key 
thing to realize here is that the `jsonReader` incorporates the Jackson JSON 
parser, and is perfectly capable of parsing any form of serialized JSON.

So, the above can be reduced to:

* Pass in the `recordArray` to the `jsonReader`.
* The "JSON reader" parses the bytes into JSON tokens.



---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151581827
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.kafkaMsgReader = 
context.getOptions().getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+this.kafkaPollTimeOut = 
context.getOptions().getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, unionEnabled);
+messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
+messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), this.writer,
+this.enableAllTextMode, false, this.readNumbersAsDouble);
+msgItr = new MessageIterator(messageReader.getConsumer(plugin), 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587030
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+
+  // Assuming default average topic message size as 1KB, which will be 
used to
+  // compute the stats and work assignments
+  private static final long MSG_SIZE = 1024;
+
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private final KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151586853
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.kafkaMsgReader = 
context.getOptions().getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+this.kafkaPollTimeOut = 
context.getOptions().getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
--- End diff --

A recent PR has simplified the above. You can now do:

```
kafkaPollTImeout = 
context.getOptions().getInt(ExecConstants.KAFKA_POLL_TIMEOUT));
```

The `getInt()` method handles type checking and avoids NPEs if somehow the 
value type is stored incorrectly.

Also, given the number of options, consider:

```
  OptionManager options = context.getOptions();
  foo = options.getSomething(...);
```


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585334
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
--- End diff --

Spelling


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151583591
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
--- End diff --

Here we are checking a value set as a system option, correct? This means 
that errors are due to the user doing something wrong. So, we probably want to 
use a `UserError.validationError(` to explain that. Else, this is going to look 
like an internal error in the logs.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151584428
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader interface provides mechanism to handle various Kafka 
Message
+ * Formats like JSON, AVRO or custom message formats.
+ */
+public interface MessageReader  extends Closeable {
+
+  public void init(DrillBuf buf, List columns, 
VectorContainerWriter writer, boolean allTextMode,
+  boolean skipOuterList, boolean readNumbersAsDouble);
--- End diff --

Does `skipOuterList` make sense here? It is for for files of the form:

```
[
  {name: "first record}
  {name: "secondRecord}
]
```

But, each Kafka record *is* a document, so the "outer list" pattern can 
never occur.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151572304
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+
+  // Assuming default average topic message size as 1KB, which will be 
used to
+  // compute the stats and work assignments
+  private static final long MSG_SIZE = 1024;
+
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private final KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151584017
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

In general, I am in favor of dynamically loading classes. However, I'm a 
bit less in favor of requiring that users know detailed class names at runtime. 
I wonder if you can do this:

* In the config system, provide a mapping of reader name to reader class.
* In the session/system option, provide the reader name.
* Here, use the name to look up the class name, which we then instantiate 
as in this existing code.

That way, I can specify "json" as the reader format, not 
"org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"...


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151581968
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.kafkaStoragePlugin = plugin;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List 
children) throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkStoragePluginConfig, coulmns,
+partitionSubScanSpecList);
+  }
+
+  @Override
+  public Iterator iterator() {
+return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkStoragePluginConfig() {
+return kafkStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+return kafkaStoragePlugin;
+  }
+
+  public List getCoulmns() {
+return coulmns;
--- End diff --

`coulmns` --> `columns` and in method name


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151581581
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Closer;
+
+public class KafkaStoragePlugin extends AbstractStoragePlugin {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePlugin.class);
+  private final KafkaSchemaFactory kafkaSchemaFactory;
+  private final KafkaStoragePluginConfig config;
+  private final DrillbitContext context;
+  private final Closer closer = Closer.create();
+
+  public KafkaStoragePlugin(KafkaStoragePluginConfig config, 
DrillbitContext context, String name)
+  throws ExecutionSetupException {
+logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
+this.config = config;
+this.context = context;
+this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+  }
+
+  public DrillbitContext getContext() {
+return this.context;
+  }
+
+  @Override
+  public KafkaStoragePluginConfig getConfig() {
+return this.config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+return Boolean.TRUE;
--- End diff --

`true` is plenty good here.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585930
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(KafkaStoragePluginConfig.NAME)
+public class KafkaStoragePluginConfig extends StoragePluginConfig {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePluginConfig.class);
+  public static final String NAME = "kafka";
+  private Properties kafkaConsumerProps;
+
+  @JsonCreator
+  public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") 
Map kafkaConsumerProps) {
+this.kafkaConsumerProps = new Properties();
+this.kafkaConsumerProps.putAll(kafkaConsumerProps);
+logger.debug("Kafka Consumer Props {}", this.kafkaConsumerProps);
+  }
+
+  public Properties getKafkaConsumerProps() {
+return kafkaConsumerProps;
+  }
+
+  @Override
+  public int hashCode() {
+final int prime = 31;
+int result = 1;
+result = prime * result + ((kafkaConsumerProps == null) ? 0 : 
kafkaConsumerProps.hashCode());
+return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+if (this == obj) {
+  return true;
+}
+if (obj == null) {
+  return false;
+}
+if (getClass() != obj.getClass()) {
+  return false;
+}
+KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj;
+if (kafkaConsumerProps == null) {
+  if (other.kafkaConsumerProps != null) {
+return false;
+  }
+} else if (!kafkaConsumerProps.equals(other.kafkaConsumerProps)) {
+  return false;
+}
--- End diff --

Just a minor point, but:

```
  if (kafkaConsumerProps == null && other.kafkaConsumerProps == null) {
return true;
  }
  if (kafkaConsumerProps == null || other.kafkaConsumerProps == null) {
return false;
  }
  return kafkaConsumerProps.equals(other.kafkaConsumerProps);
```


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151588038
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageIteratorTest extends KafkaTestBase {
+
+  private KafkaConsumer kafkaConsumer;
+  private KafkaSubScanSpec subScanSpec;
+
+  @Before
+  public void setUp() {
+Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
+consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
+kafkaConsumer = new KafkaConsumer<>(consumerProps);
+subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
+  }
+
+  @After
+  public void cleanUp() {
+if (kafkaConsumer != null) {
+  kafkaConsumer.close();
+}
+  }
+
+  @Test
+  public void testWhenPollTimeOutIsTooLess() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, 1);
--- End diff --

This is great how you've created unit tests for the message iterator itself 
independent of full-blown query tests.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585603
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
--- End diff --

Spelling


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151583298
  
--- Diff: contrib/storage-kafka/src/main/resources/drill-module.conf ---
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see 
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.kafka"
+}
+drill.exec: {
+
+  sys.store.provider: {
+kafka : {
+  "kafkaConsumerProps" : "{\"bootstrap.servers\" : \"localhost:9092\"}"
--- End diff --

These appear to be things that users will need to set. If so, our usual 
convention is to create example entries in drill-override-example.conf, along 
with comments that explain the properties.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587745
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+@RunWith(Suite.class)
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class })
+public class TestKafkaSuit {
--- End diff --

Great test. But, I wonder if we can blend this test into the existing Drill 
test frameworks to make it a bit easier to maintain.

We have two test frameworks: `BaseTestQuery` and `ClusterFixture` along 
with its `ClusterTest` test base class.

Can the Kafka cluster support be adapted into the builder/fixture model of 
the `ClusterFixture` tests? See `ExampleTest` for how we set up a Drillbit 
cluster, set properties, and control logging.

Would be great if we could do something like:

```
try (ClusterFixture cluster = fixtureBuilder.build(); ...
   KafkaFixture kafka = kafkaBuilder.build()) {
  // tests here
}
```

This is sparse info. Ping me if you need more details.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587168
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedZKQuorum.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.store.kafka.QueryConstants;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EmbeddedZKQuorum implements QueryConstants {
--- End diff --

Drill already has this concept. See `TestDrillbitResilience` for a test 
that starts multiple Drillbits and uses ZK to coordinate.


---


[GitHub] drill issue #1041: DRILL-5961: For long running queries (> 10 min) Drill may...

2017-11-16 Thread priteshm
Github user priteshm commented on the issue:

https://github.com/apache/drill/pull/1041
  
@parthchandra or @adityakishore since you'll have made changes to these 
files before, can you review the changes here?


---


[GitHub] drill pull request #1041: DRILL-5961: For long running queries (> 10 min) Dr...

2017-11-16 Thread vrozov
GitHub user vrozov opened a pull request:

https://github.com/apache/drill/pull/1041

DRILL-5961: For long running queries (> 10 min) Drill may raise 
FragmentSetupException for completed/cancelled fragments



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vrozov/drill DRILL-5961

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1041.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1041


commit f7e0c44c1277cc3fa0cdf466e01521974c98262d
Author: Vlad Rozov 
Date:   2017-11-15T00:24:01Z

DRILL-5961: For long running queries (> 10 min) Drill may raise 
FragmentSetupException for completed/cancelled fragments

commit 575dec0fdc0dc4efb50569afa568c06f21546e6e
Author: Vlad Rozov 
Date:   2017-11-15T00:35:09Z

DRILL-5961: For long running queries (> 10 min) Drill may raise 
FragmentSetupException for completed/cancelled fragments




---


[GitHub] drill pull request #1037: DRILL-5968: Add support for empty service_host use...

2017-11-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/drill/pull/1037


---


[GitHub] drill issue #1040: Drill 5425

2017-11-16 Thread priteshm
Github user priteshm commented on the issue:

https://github.com/apache/drill/pull/1040
  
@arina-ielchiieva can you please review this?


---


[GitHub] drill pull request #1024: DRILL-3640: Support JDBC Statement.setQueryTimeout...

2017-11-16 Thread kkhatua
Github user kkhatua commented on a diff in the pull request:

https://github.com/apache/drill/pull/1024#discussion_r151580948
  
--- Diff: 
exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---
@@ -96,6 +105,14 @@ private void throwIfClosed() throws 
AlreadyClosedSqlException,
 throw new AlreadyClosedSqlException( "ResultSet is already 
closed." );
   }
 }
+
+//Implicit check for whether timeout is set
+if (elapsedTimer != null) {
--- End diff --

I've logged https://issues.apache.org/jira/browse/DRILL-5973 to support a 
time-bound pause on the server. That should help resolve this test, which 
otherwise passes... but just doesn't clean up gracefully.
I'll make the required changes and update this PR.


---


[jira] [Created] (DRILL-5973) Support injections of a time-bound pause after which the server resumes

2017-11-16 Thread Kunal Khatua (JIRA)
Kunal Khatua created DRILL-5973:
---

 Summary: Support injections of a time-bound pause after which the 
server resumes
 Key: DRILL-5973
 URL: https://issues.apache.org/jira/browse/DRILL-5973
 Project: Apache Drill
  Issue Type: Improvement
  Components: Tools, Build & Test
Affects Versions: 1.11.0
Reporter: Kunal Khatua
 Fix For: 1.12.0, 1.13.0


While working on DRILL-3640 , when creating a unit test for a server-induced 
timeout, the injecting a pause leaves the JUnit framework's DrillClient without 
a handle to the query on the server. This is because we injected the pause to 
occur before the server could send back a query ID, so the DrillClient has no 
way to unpause the server.

The workaround to support this unit test is to allow for injecting pauses with 
a defined time-bound, after which the server would resume.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] drill issue #1037: DRILL-5968: Add support for empty service_host user prope...

2017-11-16 Thread superbstreak
Github user superbstreak commented on the issue:

https://github.com/apache/drill/pull/1037
  
Thanks, @parthchandra !


---


[GitHub] drill issue #944: DRILL-5425: Support HTTP Kerberos auth using SPNEGO

2017-11-16 Thread sohami
Github user sohami commented on the issue:

https://github.com/apache/drill/pull/944
  
Created a separate PR for this JIRA: 
https://github.com/apache/drill/pull/1040
1) I have kept the initial commit with the new PR and added new changes as 
separate commit.
2) Changes include refactoring of the code, added new tests and fixes done 
for the issues.


---


[GitHub] drill pull request #1040: Drill 5425

2017-11-16 Thread sohami
GitHub user sohami opened a pull request:

https://github.com/apache/drill/pull/1040

Drill 5425



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sohami/drill DRILL-5425

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/drill/pull/1040.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1040


commit abf8eee1e267871af78bb81d7f41708f961c71d7
Author: Sindhuri Rayavaram 
Date:   2017-09-11T23:56:22Z

DRILL-5425: Support HTTP Kerberos auth using SPNEGO

commit 5e54f4ab2768f687f86d78e1992aa9a2c797840a
Author: Sorabh Hamirwasia 
Date:   2017-11-13T22:43:52Z

DRILL-5425: Refactor, Add tests and bunch of fixes




---


[GitHub] drill issue #1037: DRILL-5968: Add support for empty service_host user prope...

2017-11-16 Thread parthchandra
Github user parthchandra commented on the issue:

https://github.com/apache/drill/pull/1037
  
LGTM. (Yes, let's merge this in before 1.12.)


---


[GitHub] drill issue #1037: DRILL-5968: Add support for empty service_host user prope...

2017-11-16 Thread superbstreak
Github user superbstreak commented on the issue:

https://github.com/apache/drill/pull/1037
  
@parthchandra  We should get this in for 1.12?


---


[GitHub] drill pull request #1037: DRILL-5968: Add support for empty service_host use...

2017-11-16 Thread superbstreak
Github user superbstreak commented on a diff in the pull request:

https://github.com/apache/drill/pull/1037#discussion_r151515278
  
--- Diff: contrib/native/client/src/include/drill/userProperties.hpp ---
@@ -28,6 +28,18 @@ class DECLSPEC_DRILL_CLIENT DrillUserProperties{
 static const std::map USER_PROPERTIES;
 
 DrillUserProperties(){};
+
+/// @brief Update the property value associate with the property 
key if the value is 
+/// empty.
+/// 
+/// @param in_propName  The property name.
+/// @param in_propValue The property value.
+void setPropertyIfEmpty(const std::string& in_propName, const 
std::string& in_propValue){
+// If the element value is empty, then update.
+if (isPropSet(in_propName) && 
m_properties[in_propName].empty()){
--- End diff --

certainly


---


[GitHub] drill pull request #1037: DRILL-5968: Add support for empty service_host use...

2017-11-16 Thread parthchandra
Github user parthchandra commented on a diff in the pull request:

https://github.com/apache/drill/pull/1037#discussion_r151512282
  
--- Diff: contrib/native/client/src/include/drill/userProperties.hpp ---
@@ -28,6 +28,18 @@ class DECLSPEC_DRILL_CLIENT DrillUserProperties{
 static const std::map USER_PROPERTIES;
 
 DrillUserProperties(){};
+
+/// @brief Update the property value associate with the property 
key if the value is 
+/// empty.
+/// 
+/// @param in_propName  The property name.
+/// @param in_propValue The property value.
+void setPropertyIfEmpty(const std::string& in_propName, const 
std::string& in_propValue){
+// If the element value is empty, then update.
+if (isPropSet(in_propName) && 
m_properties[in_propName].empty()){
--- End diff --

Better check would be 
`if ( !isPropSet(in_propName) || m_properties[in_propName].empty() )`


---


[GitHub] drill issue #914: DRILL-5657: Size-aware vector writer structure

2017-11-16 Thread parthchandra
Github user parthchandra commented on the issue:

https://github.com/apache/drill/pull/914
  
Just FYI, this (https://github.com/d-t-w/netty-buffers) seems to indicate 
that some of Netty's internal fragmentation issues seem to have been addressed 
since 4.0.37


---


[GitHub] drill issue #1038: DRILL-5972: Slow performance for query on INFORMATION_SCH...

2017-11-16 Thread priteshm
Github user priteshm commented on the issue:

https://github.com/apache/drill/pull/1038
  
@parthchandra can you please review this?


---


[GitHub] drill pull request #1032: DRILL-5089: Dynamically load schema of storage plu...

2017-11-16 Thread chunhui-shi
Github user chunhui-shi commented on a diff in the pull request:

https://github.com/apache/drill/pull/1032#discussion_r151493351
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java
 ---
@@ -175,6 +193,21 @@ public WorkspaceSchema createSchema(List 
parentSchemaPath, SchemaConfig
 return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig);
   }
 
+  public WorkspaceSchema createSchema(List parentSchemaPath, 
SchemaConfig schemaConfig, DrillFileSystem fs) throws IOException {
+if (!accessible(fs)) {
--- End diff --

returning null then user could not even list this workspace, so they don't 
know the existence of this workspace at all. I think that is a good access 
control practice. 

If users expect to see a workspace but could not see it, then they need to 
figure out why by themselves.


---


[GitHub] drill issue #1030: DRILL-5941: Skip header / footer improvements for Hive st...

2017-11-16 Thread arina-ielchiieva
Github user arina-ielchiieva commented on the issue:

https://github.com/apache/drill/pull/1030
  
@ppadma pull request is updated to handle header / footer logic in 
distributed environment. Comment with PR description in the beginning of the 
pull request is updated. Please review when possible.


---