[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-28 Thread akumarb2010
Github user akumarb2010 closed the pull request at:

https://github.com/apache/drill/pull/1027


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread kameshb
Github user kameshb commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152310859
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+@RunWith(Suite.class)
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class })
+public class TestKafkaSuit {
--- End diff --

Thanks for suggesting this frameworks.  I think this may take considerable 
amount of time and since release date is very close, we will address this 
refactoring by next release.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread kameshb
Github user kameshb commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152303056
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageReaderFactoryTest {
+
+  @Test
+  public void testShouldThrowExceptionAsMessageReaderIsNull() {
--- End diff --

taken care


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152286833
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageReaderFactoryTest {
+
+  @Test
+  public void testShouldThrowExceptionAsMessageReaderIsNull() {
--- End diff --

Please check the below tests, they can give false positive result as well.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread kameshb
Github user kameshb commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152285907
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+OptionManager options = context.getOptions();
+this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.kafkaMsgReader = 
options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+this.kafkaPollTimeOut = 
options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, unionEnabled);
+messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
+messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), this.writer,
+this.enableAllTextMode, this.readNumbersAsDouble);
+

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152248126
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java
 ---
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.schema;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DynamicDrillTable;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.kafka.KafkaScanSpec;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+
+public class KafkaMessageSchema extends AbstractSchema {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaMessageSchema.class);
+  private final KafkaStoragePlugin plugin;
+  private final Map drillTables = Maps.newHashMap();
+  private Set tableNames;
+
+  public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String 
name) {
+super(ImmutableList. of(), name);
+this.plugin = plugin;
+  }
+
+  @Override
+  public String getTypeName() {
+return KafkaStoragePluginConfig.NAME;
+  }
+
+  void setHolder(SchemaPlus plusOfThis) {
+for (String s : getSubSchemaNames()) {
+  plusOfThis.add(s, getSubSchema(s));
+}
+  }
+
+  @Override
+  public Table getTable(String tableName) {
+if (!drillTables.containsKey(tableName)) {
+  KafkaScanSpec scanSpec = new KafkaScanSpec(tableName);
+  DrillTable table = new DynamicDrillTable(plugin, getName(), 
scanSpec);
+  drillTables.put(tableName, table);
+}
+
+return drillTables.get(tableName);
+  }
+
+  @Override
+  public Set getTableNames() {
+if (tableNames == null) {
+  try (KafkaConsumer kafkaConsumer = new 
KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) {
+tableNames = kafkaConsumer.listTopics().keySet();
+  } catch(KafkaException e) {
+logger.error(e.getMessage(), e);
+throw UserException.dataReadError(e).message("Failed to get tables 
information").addContext(e.getMessage())
--- End diff --

If I am not mistaken, `UserException` logs exception internally. Please 
checks the logs to verify if using `logger` and `UserException` is not 
duplicating exception.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152246552
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageIteratorTest extends KafkaTestBase {
+
+  private KafkaConsumer kafkaConsumer;
+  private KafkaSubScanSpec subScanSpec;
+
+  @Before
+  public void setUp() {
+Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
+consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
+kafkaConsumer = new KafkaConsumer<>(consumerProps);
+subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
+  }
+
+  @After
+  public void cleanUp() {
+if (kafkaConsumer != null) {
+  kafkaConsumer.close();
+}
+  }
+
+  @Test
+  public void testWhenPollTimeOutIsTooLess() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, 1);
+try {
+  iterator.hasNext();
+} catch (UserException ue) {
--- End diff --

Test cane give false positive result if exception won't be thrown at all. 
Please re-throw the exception in catch block and update test 
annotation`@Test(expected = UserException.class`


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152248647
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+OptionManager options = context.getOptions();
+this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.kafkaMsgReader = 
options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+this.kafkaPollTimeOut = 
options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, unionEnabled);
+messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
+messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), this.writer,
+this.enableAllTextMode, this.readNumbersAsDouble);
  

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-21 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r152245838
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java
 ---
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+
+import kafka.common.KafkaException;
+
+public class MessageIterator implements Iterator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageIterator.class);
+  private final KafkaConsumer kafkaConsumer;
+  private Iterator> recordIter;
+  private final TopicPartition topicPartition;
+  private long totalFetchTime = 0;
+  private final long kafkaPollTimeOut;
+  private final long endOffset;
+
+  public MessageIterator(final KafkaConsumer 
kafkaConsumer, final KafkaSubScanSpec subScanSpec,
+  final long kafkaPollTimeOut) {
+this.kafkaConsumer = kafkaConsumer;
+this.kafkaPollTimeOut = kafkaPollTimeOut;
+
+List partitions = Lists.newArrayListWithCapacity(1);
+topicPartition = new TopicPartition(subScanSpec.getTopicName(), 
subScanSpec.getPartitionId());
+partitions.add(topicPartition);
+this.kafkaConsumer.assign(partitions);
+logger.info("Start offset of {}:{} is - {}", 
subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
+subScanSpec.getStartOffset());
+this.kafkaConsumer.seek(topicPartition, subScanSpec.getStartOffset());
+this.endOffset = subScanSpec.getEndOffset();
+  }
+
+  @Override
+  public void remove() {
+throw new UnsupportedOperationException("Does not support remove 
operation");
+  }
+
+  @Override
+  public boolean hasNext() {
+if (recordIter != null && recordIter.hasNext()) {
+  return true;
+}
+
+long nextPosition = kafkaConsumer.position(topicPartition);
+if (nextPosition >= endOffset) {
+  return false;
+}
+
+ConsumerRecords consumerRecords = null;
+Stopwatch stopwatch = Stopwatch.createStarted();
+try {
+  consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut);
+} catch (KafkaException ke) {
+  logger.error(ke.getMessage(), ke);
+  throw 
UserException.dataReadError(ke).message(ke.getMessage()).build(logger);
+}
+stopwatch.stop();
+
+String errorMsg = new StringBuilder().append("Failed to fetch messages 
within ").append(kafkaPollTimeOut)
--- End diff --

Error message can be constructed inside of if clause: `if 
(consumerRecords.isEmpty()) {`


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890267
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Closer;
+
+public class KafkaStoragePlugin extends AbstractStoragePlugin {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePlugin.class);
+  private final KafkaSchemaFactory kafkaSchemaFactory;
+  private final KafkaStoragePluginConfig config;
+  private final DrillbitContext context;
+  private final Closer closer = Closer.create();
+
+  public KafkaStoragePlugin(KafkaStoragePluginConfig config, 
DrillbitContext context, String name)
+  throws ExecutionSetupException {
+logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
+this.config = config;
+this.context = context;
+this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+  }
+
+  public DrillbitContext getContext() {
+return this.context;
+  }
+
+  @Override
+  public KafkaStoragePluginConfig getConfig() {
+return this.config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+return Boolean.TRUE;
--- End diff --

This is taken care


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151888697
  
--- Diff: contrib/storage-kafka/README.md ---
@@ -0,0 +1,230 @@
+# Drill Kafka Plugin
+
+Drill kafka storage plugin allows you to perform interactive analysis 
using SQL against Apache Kafka.
+
+Supported Kafka Version
+Kafka-0.10 and above 
+
+Message Formats
+Currently this plugin supports reading only Kafka messages of type 
JSON.
+
+
+Message Readers
+Message Readers are used for reading messages from Kafka. Type of the 
MessageReaders supported as of now are
+
+
+  
+MessageReader
+Description
+Key DeSerializer 
+Value DeSerializer
+  
+  
+JsonMessageReader
+To read Json messages
+org.apache.kafka.common.serialization.ByteArrayDeserializer 
+org.apache.kafka.common.serialization.ByteArrayDeserializer
+  
+
+
+
+Plugin Configurations
+Drill Kafka plugin supports following properties
+
+   kafkaConsumerProps: These are typical https://kafka.apache.org/documentation/#consumerconfigs;>Kafka consumer 
properties.
+drillKafkaProps: These are Drill Kafka plugin 
properties. As of now, it supports the following properties
+   
+drill.kafka.message.reader: Message Reader 
implementation to use while reading messages from Kafka. Message reader 
implementaion should be configured based on message format. Type of message 
readers
+ 
+ org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
+ 
+
+drill.kafka.poll.timeout: Polling timeout used by 
Kafka client while fetching messages from Kafka cluster.
+
+
+
+
+Plugin Registration
+To register the kafka plugin, open the drill web interface. To open the 
drill web interface, enter http://drillbit:8047/storage in 
your browser.
+
+The following is an example plugin registration configuration
+
+{
+  "type": "kafka",
+  "kafkaConsumerProps": {
+"key.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+"auto.offset.reset": "earliest",
+"bootstrap.servers": "localhost:9092",
+"enable.auto.commit": "true",
+"group.id": "drill-query-consumer-1",
+"value.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+"session.timeout.ms": "3"
+  },
+  "drillKafkaProps": {
+"drill.kafka.message.reader": 
"org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
+"drill.kafka.poll.timeout": "2000"
+  },
+  "enabled": true
+}
+
+
+ Abstraction 
+In Drill, each Kafka topic is mapped to a SQL table and when a query is 
issued on a table, it scans all the messages from the earliest offset to the 
latest offset of that topic at that point of time. This plugin automatically 
discovers all the topics (tables), to allow you perform analysis without 
executing DDL statements.
--- End diff --

Thanks Paul. Created the JIRA for this task.
https://issues.apache.org/jira/browse/DRILL-5977


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890205
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890058
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader interface provides mechanism to handle various Kafka 
Message
+ * Formats like JSON, AVRO or custom message formats.
+ */
+public interface MessageReader  extends Closeable {
+
+  public void init(DrillBuf buf, List columns, 
VectorContainerWriter writer, boolean allTextMode,
+  boolean skipOuterList, boolean readNumbersAsDouble);
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890101
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890077
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.kafkaStoragePlugin = plugin;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List 
children) throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkStoragePluginConfig, coulmns,
+partitionSubScanSpecList);
+  }
+
+  @Override
+  public Iterator iterator() {
+return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkStoragePluginConfig() {
+return kafkStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+return kafkaStoragePlugin;
+  }
+
+  public List getCoulmns() {
+return coulmns;
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890085
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890087
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890052
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

As per your suggestion raised a JIRA for this 
https://issues.apache.org/jira/browse/DRILL-5976


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890096
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151890105
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-19 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151889218
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

Thanks for this suggestion Paul. Initially we have consider this as Plugin 
config. 

But, in Kafka most of the times users might need to implement their own 
custom MessageReader implementation. 

For example, Kafka messages can be encrypted. In other frameworks like 
Spark streaming or Storm or Camus, user will provide Deserializer/Decoder. 

As you suggested, created a separate JIRA 
https://issues.apache.org/jira/browse/DRILL-5976 for this.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585383
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
--- End diff --

Spelling: `kafkStorage...` --> `kafkaStorage`


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587965
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageIteratorTest extends KafkaTestBase {
+
+  private KafkaConsumer kafkaConsumer;
+  private KafkaSubScanSpec subScanSpec;
+
+  @Before
+  public void setUp() {
+Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
+consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
+kafkaConsumer = new KafkaConsumer<>(consumerProps);
+subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
+  }
+
+  @After
+  public void cleanUp() {
+if (kafkaConsumer != null) {
+  kafkaConsumer.close();
+}
+  }
+
+  @Test
+  public void testWhenPollTimeOutIsTooLess() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, 1);
+try {
+  iterator.hasNext();
+} catch (UserException ue) {
+  Assert.assertEquals(ErrorType.DATA_READ, ue.getErrorType());
+  Assert.assertTrue(ue.getMessage().contains(
+  "DATA_READ ERROR: Failed to fetch messages within 1 
milliseconds. Consider increasing the value of the property : 
store.kafka.poll.timeout"));
+}
+  }
+
+  @Test
+  public void testShouldReturnTrueAsKafkaHasMessages() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, TimeUnit.SECONDS.toMillis(1));
+Assert.assertTrue("Message iterator returned false though there are 
messages in Kafka", iterator.hasNext());
+  }
+
+  @Test
+  public void testShouldReturnMessage1() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, TimeUnit.SECONDS.toMillis(1));
+// Calling hasNext makes only one poll to Kafka which fetches only 4 
messages.
+// so fifth operation on iterator is expected to fail.
+iterator.hasNext();
+Assert.assertNotNull(iterator.next());
+Assert.assertNotNull(iterator.next());
+Assert.assertNotNull(iterator.next());
+Assert.assertNotNull(iterator.next());
+try {
+  iterator.next();
+  Assert.fail("Kafak fetched more messages than configured.");
+} catch (NoSuchElementException nse) {
+  Assert.assertTrue(true);
--- End diff --

Should `next()` just return false at this point rather than throwing an 
exception? That is, should the iterator map the exception to the normal 
iterator interface of returning `false` at EOF?

Also, a typical Drill pattern for the catch block is just to say:
```
// Expected
```


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585461
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
--- End diff --

Spelling. Since this is a public API, might as well get it right.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151586477
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

Actually, seeing the larger picture, I wonder if this is the right 
approach. Suppose this is two or three releases from now and we support other 
forms of Kafka messages. Different topics use different formats.

If the message format is a system/session option, then I need to switch the 
option before each query. Very cumbersome and error prone.

Instead, perhaps this information should be part of the storage plugin 
config. Then, I can define different plugins: one for each message format.

Further, can I have multiple Kafka servers? If so, would I need different 
plugin configs for each?

So, should we be thinking about encoding most properties as plugin config 
properties?

Now, the plugin might have a `format` property, one of which is `json`. The 
JSON config properties would be defined in the `json` format within the overall 
storage plugin config.

Given the looming deadline of the 1.12 release, it is fine if we file the 
above suggestion as a JIRA and work on it for 1.13.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151582174
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.kafkaStoragePlugin = plugin;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List 
children) throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkStoragePluginConfig, coulmns,
+partitionSubScanSpecList);
+  }
+
+  @Override
+  public Iterator iterator() {
+return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkStoragePluginConfig() {
+return kafkStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+return kafkaStoragePlugin;
+  }
+
+  public List getCoulmns() {
+return coulmns;
+  }
+
+  public List getPartitionSubScanSpecList() {
+return partitionSubScanSpecList;
+  }
+
+  @Override
+  public int getOperatorType() {
+return 0;
--- End diff --

Should add an operator code in `UserBitShared.proto`, something like:

```

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151584935
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_MSG_KEY;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_OFFSET;
+import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_PARTITION_ID;
+import static 
org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TIMESTAMP;
+import static org.apache.drill.exec.store.kafka.MetaDataField.KAFKA_TOPIC;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.fn.JsonReader;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Charsets;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader class which will convert ConsumerRecord into JSON and 
writes to
+ * VectorContainerWriter of JsonReader
+ *
+ */
+public class JsonMessageReader implements MessageReader {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JsonMessageReader.class);
+  private JsonReader jsonReader;
+  private VectorContainerWriter writer;
+
+  @Override
+  public void init(DrillBuf buf, List columns, 
VectorContainerWriter writer, boolean allTextMode,
+  boolean skipOuterList, boolean readNumbersAsDouble) {
+this.jsonReader = new JsonReader(buf, columns, allTextMode, 
skipOuterList, readNumbersAsDouble);
+this.writer = writer;
+  }
+
+  @Override
+  public void readMessage(ConsumerRecord record) {
+try {
+  byte[] recordArray = (byte[]) record.value();
+  JsonObject jsonObj = (new JsonParser()).parse(new 
String(recordArray, Charsets.UTF_8)).getAsJsonObject();
+  jsonObj.addProperty(KAFKA_TOPIC.getFieldName(), record.topic());
+  jsonObj.addProperty(KAFKA_PARTITION_ID.getFieldName(), 
record.partition());
+  jsonObj.addProperty(KAFKA_OFFSET.getFieldName(), record.offset());
+  jsonObj.addProperty(KAFKA_TIMESTAMP.getFieldName(), 
record.timestamp());
+  jsonObj.addProperty(KAFKA_MSG_KEY.getFieldName(), record.key() != 
null ? record.key().toString() : null);
+  jsonReader.setSource(jsonObj.toString().getBytes(Charsets.UTF_8));
--- End diff --

Actually, no need for all this complexity. This code:

* Gets UTF-8 bytes encoding JSON
* Converts that to a string
* Parses that into a JSON object
* Serializes that into a string
* Converts that to a byte array.
* The "JSON reader" parses the bytes back into JSON tokens.

Any reason we can't just pass in the byte array directly? Another 
alternative is to create an `InputStream` that reads the byte array. The key 
thing to realize here is that the `jsonReader` incorporates the Jackson JSON 
parser, and is perfectly capable of parsing any form of serialized JSON.

So, the above can be reduced to:

* Pass in the `recordArray` to the `jsonReader`.
* The "JSON reader" parses the bytes into JSON tokens.



---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151581827
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.kafkaMsgReader = 
context.getOptions().getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+this.kafkaPollTimeOut = 
context.getOptions().getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, unionEnabled);
+messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader);
+messageReader.init(context.getManagedBuffer(), 
Lists.newArrayList(getColumns()), this.writer,
+this.enableAllTextMode, false, this.readNumbersAsDouble);
+msgItr = new MessageIterator(messageReader.getConsumer(plugin), 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587030
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+
+  // Assuming default average topic message size as 1KB, which will be 
used to
+  // compute the stats and work assignments
+  private static final long MSG_SIZE = 1024;
+
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private final KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151586853
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private final boolean unionEnabled;
+  private final KafkaStoragePlugin plugin;
+  private final KafkaSubScanSpec subScanSpec;
+  private final long kafkaPollTimeOut;
+
+  private long currentOffset;
+  private MessageIterator msgItr;
+
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+  private final String kafkaMsgReader;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.kafkaMsgReader = 
context.getOptions().getOption(ExecConstants.KAFKA_RECORD_READER).string_val;
+this.kafkaPollTimeOut = 
context.getOptions().getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val;
--- End diff --

A recent PR has simplified the above. You can now do:

```
kafkaPollTImeout = 
context.getOptions().getInt(ExecConstants.KAFKA_POLL_TIMEOUT));
```

The `getInt()` method handles type checking and avoids NPEs if somehow the 
value type is stored incorrectly.

Also, given the number of options, consider:

```
  OptionManager options = context.getOptions();
  foo = options.getSomething(...);
```


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585334
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
--- End diff --

Spelling


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151583591
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
--- End diff --

Here we are checking a value set as a system option, correct? This means 
that errors are due to the user doing something wrong. So, we probably want to 
use a `UserError.validationError(` to explain that. Else, this is going to look 
like an internal error in the logs.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151584428
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReader.java
 ---
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+import io.netty.buffer.DrillBuf;
+
+/**
+ * MessageReader interface provides mechanism to handle various Kafka 
Message
+ * Formats like JSON, AVRO or custom message formats.
+ */
+public interface MessageReader  extends Closeable {
+
+  public void init(DrillBuf buf, List columns, 
VectorContainerWriter writer, boolean allTextMode,
+  boolean skipOuterList, boolean readNumbersAsDouble);
--- End diff --

Does `skipOuterList` make sense here? It is for for files of the form:

```
[
  {name: "first record}
  {name: "secondRecord}
]
```

But, each Kafka record *is* a document, so the "outer list" pattern can 
never occur.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151572304
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+
+  // Assuming default average topic message size as 1KB, which will be 
used to
+  // compute the stats and work assignments
+  private static final long MSG_SIZE = 1024;
+
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private final KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151584017
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactory.java
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.decoders;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class MessageReaderFactory {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MessageReaderFactory.class);
+
+  /**
+   * Initialize kafka message reader beased on store.kafka.record.reader 
session
+   * property
+   *
+   * @param messageReaderKlass
+   *  value of store.kafka.record.reader session property
+   * @return kafka message reader
+   * @throws UserException
+   *   in case of any message reader initialization
+   */
+  public static MessageReader getMessageReader(String messageReaderKlass) {
+Preconditions.checkNotNull(messageReaderKlass, "Please set 
store.kafka.record.reader " + messageReaderKlass);
+MessageReader messageReader = null;
+try {
+  Class klass = Class.forName(messageReaderKlass);
--- End diff --

In general, I am in favor of dynamically loading classes. However, I'm a 
bit less in favor of requiring that users know detailed class names at runtime. 
I wonder if you can do this:

* In the config system, provide a mapping of reader name to reader class.
* In the session/system option, provide the reader name.
* Here, use the name to look up the class name, which we then instantiate 
as in this existing code.

That way, I can specify "json" as the reader format, not 
"org.apache.drill.exec.store.kafka.decoders.JsonMessageReader"...


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151581968
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
+  @JsonProperty("coulmns") List coulmns,
+  @JsonProperty("partitionSubScanSpecList") 
LinkedList partitionSubScanSpecList)
+  throws ExecutionSetupException {
+super(userName);
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.coulmns = coulmns;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+this.kafkaStoragePlugin = (KafkaStoragePlugin) 
registry.getPlugin(kafkStoragePluginConfig);
+  }
+
+  public KafkaSubScan(String userName, KafkaStoragePlugin plugin, 
KafkaStoragePluginConfig kafkStoragePluginConfig,
+  List coulmns, List 
partitionSubScanSpecList) {
+super(userName);
+this.coulmns = coulmns;
+this.kafkStoragePluginConfig = kafkStoragePluginConfig;
+this.kafkaStoragePlugin = plugin;
+this.partitionSubScanSpecList = partitionSubScanSpecList;
+  }
+
+  @Override
+  public  T accept(PhysicalVisitor 
physicalVisitor, X value) throws E {
+return physicalVisitor.visitSubScan(this, value);
+  }
+
+  @Override
+  public PhysicalOperator getNewWithChildren(List 
children) throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+return new KafkaSubScan(getUserName(), kafkaStoragePlugin, 
kafkStoragePluginConfig, coulmns,
+partitionSubScanSpecList);
+  }
+
+  @Override
+  public Iterator iterator() {
+return Collections.emptyIterator();
+  }
+
+  @JsonIgnore
+  public KafkaStoragePluginConfig getKafkStoragePluginConfig() {
+return kafkStoragePluginConfig;
+  }
+
+  @JsonIgnore
+  public KafkaStoragePlugin getKafkaStoragePlugin() {
+return kafkaStoragePlugin;
+  }
+
+  public List getCoulmns() {
+return coulmns;
--- End diff --

`coulmns` --> `columns` and in method name


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151581581
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePlugin.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.AbstractStoragePlugin;
+import org.apache.drill.exec.store.SchemaConfig;
+import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.kafka.schema.KafkaSchemaFactory;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.io.Closer;
+
+public class KafkaStoragePlugin extends AbstractStoragePlugin {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePlugin.class);
+  private final KafkaSchemaFactory kafkaSchemaFactory;
+  private final KafkaStoragePluginConfig config;
+  private final DrillbitContext context;
+  private final Closer closer = Closer.create();
+
+  public KafkaStoragePlugin(KafkaStoragePluginConfig config, 
DrillbitContext context, String name)
+  throws ExecutionSetupException {
+logger.debug("Initializing {}", KafkaStoragePlugin.class.getName());
+this.config = config;
+this.context = context;
+this.kafkaSchemaFactory = new KafkaSchemaFactory(this, name);
+  }
+
+  public DrillbitContext getContext() {
+return this.context;
+  }
+
+  @Override
+  public KafkaStoragePluginConfig getConfig() {
+return this.config;
+  }
+
+  @Override
+  public boolean supportsRead() {
+return Boolean.TRUE;
--- End diff --

`true` is plenty good here.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585930
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaStoragePluginConfig.java
 ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.drill.common.logical.StoragePluginConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+@JsonTypeName(KafkaStoragePluginConfig.NAME)
+public class KafkaStoragePluginConfig extends StoragePluginConfig {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaStoragePluginConfig.class);
+  public static final String NAME = "kafka";
+  private Properties kafkaConsumerProps;
+
+  @JsonCreator
+  public KafkaStoragePluginConfig(@JsonProperty("kafkaConsumerProps") 
Map kafkaConsumerProps) {
+this.kafkaConsumerProps = new Properties();
+this.kafkaConsumerProps.putAll(kafkaConsumerProps);
+logger.debug("Kafka Consumer Props {}", this.kafkaConsumerProps);
+  }
+
+  public Properties getKafkaConsumerProps() {
+return kafkaConsumerProps;
+  }
+
+  @Override
+  public int hashCode() {
+final int prime = 31;
+int result = 1;
+result = prime * result + ((kafkaConsumerProps == null) ? 0 : 
kafkaConsumerProps.hashCode());
+return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+if (this == obj) {
+  return true;
+}
+if (obj == null) {
+  return false;
+}
+if (getClass() != obj.getClass()) {
+  return false;
+}
+KafkaStoragePluginConfig other = (KafkaStoragePluginConfig) obj;
+if (kafkaConsumerProps == null) {
+  if (other.kafkaConsumerProps != null) {
+return false;
+  }
+} else if (!kafkaConsumerProps.equals(other.kafkaConsumerProps)) {
+  return false;
+}
--- End diff --

Just a minor point, but:

```
  if (kafkaConsumerProps == null && other.kafkaConsumerProps == null) {
return true;
  }
  if (kafkaConsumerProps == null || other.kafkaConsumerProps == null) {
return false;
  }
  return kafkaConsumerProps.equals(other.kafkaConsumerProps);
```


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151588038
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class MessageIteratorTest extends KafkaTestBase {
+
+  private KafkaConsumer kafkaConsumer;
+  private KafkaSubScanSpec subScanSpec;
+
+  @Before
+  public void setUp() {
+Properties consumerProps = storagePluginConfig.getKafkaConsumerProps();
+consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
+consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4");
+kafkaConsumer = new KafkaConsumer<>(consumerProps);
+subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, 
TestKafkaSuit.NUM_JSON_MSG);
+  }
+
+  @After
+  public void cleanUp() {
+if (kafkaConsumer != null) {
+  kafkaConsumer.close();
+}
+  }
+
+  @Test
+  public void testWhenPollTimeOutIsTooLess() {
+MessageIterator iterator = new MessageIterator(kafkaConsumer, 
subScanSpec, 1);
--- End diff --

This is great how you've created unit tests for the message iterator itself 
independent of full-blown query tests.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151585603
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaSubScan.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.physical.base.SubScan;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+
+@JsonTypeName("kafka-partition-scan")
+public class KafkaSubScan extends AbstractBase implements SubScan {
+
+  @JsonProperty
+  private final KafkaStoragePluginConfig kafkStoragePluginConfig;
+
+  @JsonIgnore
+  private final KafkaStoragePlugin kafkaStoragePlugin;
+  private final List coulmns;
+  private final List partitionSubScanSpecList;
+
+  @JsonCreator
+  public KafkaSubScan(@JacksonInject StoragePluginRegistry registry, 
@JsonProperty("userName") String userName,
+  @JsonProperty("kafkStoragePluginConfig") KafkaStoragePluginConfig 
kafkStoragePluginConfig,
--- End diff --

Spelling


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151583298
  
--- Diff: contrib/storage-kafka/src/main/resources/drill-module.conf ---
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+//
+//  This file tells Drill to consider this module when class path scanning.
+//  This file can also include any supplementary configuration information.
+//  This file is in HOCON format, see 
https://github.com/typesafehub/config/blob/master/HOCON.md for more information.
+drill.classpath.scanning: {
+  packages += "org.apache.drill.exec.store.kafka"
+}
+drill.exec: {
+
+  sys.store.provider: {
+kafka : {
+  "kafkaConsumerProps" : "{\"bootstrap.servers\" : \"localhost:9092\"}"
--- End diff --

These appear to be things that users will need to set. If so, our usual 
convention is to create example entries in drill-override-example.conf, along 
with comments that explain the properties.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587745
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+import org.junit.runners.Suite.SuiteClasses;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+@RunWith(Suite.class)
+@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class })
+public class TestKafkaSuit {
--- End diff --

Great test. But, I wonder if we can blend this test into the existing Drill 
test frameworks to make it a bit easier to maintain.

We have two test frameworks: `BaseTestQuery` and `ClusterFixture` along 
with its `ClusterTest` test base class.

Can the Kafka cluster support be adapted into the builder/fixture model of 
the `ClusterFixture` tests? See `ExampleTest` for how we set up a Drillbit 
cluster, set properties, and control logging.

Would be great if we could do something like:

```
try (ClusterFixture cluster = fixtureBuilder.build(); ...
   KafkaFixture kafka = kafkaBuilder.build()) {
  // tests here
}
```

This is sparse info. Ping me if you need more details.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-16 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r151587168
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedZKQuorum.java
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka.cluster;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.drill.exec.store.kafka.QueryConstants;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class EmbeddedZKQuorum implements QueryConstants {
--- End diff --

Drill already has this concept. See `TestDrillbitResilience` for a test 
that starts multiple Drillbits and uses ZK to coordinate.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-12 Thread kameshb
Github user kameshb commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150435308
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, unionEnabled);

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379534
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
+
+  private KafkaStoragePlugin kafkaStoragePlugin;
+  private KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = kafkaScanSpec;
+init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List columns,
 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379448
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/DrillKafkaConfig.java
 ---
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public class DrillKafkaConfig {
+
+  /**
+   * Timeout for fetching messages from Kafka
--- End diff --

This is done


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379457
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
+
+  private KafkaStoragePlugin kafkaStoragePlugin;
+  private KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = kafkaScanSpec;
+init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List columns,
 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379539
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
+
+  private KafkaStoragePlugin kafkaStoragePlugin;
+  private KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = kafkaScanSpec;
+init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List columns,
 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379549
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
--- End diff --

We verified select count(*) queries and they are working fine.

When we debug, we observed following observations.
1) In case of select count(*) from kafka.`clickstream-json-demo` query, 
results are handled bt DefaultSQLQueryHandler without initiating 
KafkaScanBatchCreator and KafkaRecordReader

2) and in case of select count(*) from kafka.`clickstream-json-demo` where  
kafkaMsgOffset < 2765200; query, it's considering projectedColumns as 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379428
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+   4.0.0
+
+   
+   drill-contrib-parent
+   org.apache.drill.contrib
+   1.12.0-SNAPSHOT
+   
+
+   drill-storage-kafka
+   contrib/kafka-storage-plugin
+
+   
+   
UTF-8
--- End diff --

This is taken care


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379256
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
--- End diff --

Taken care


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379243
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/DrillKafkaConfig.java
 ---
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public class DrillKafkaConfig {
--- End diff --

As part of converting these configs into sys options, this class is removed.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379142
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+drill-contrib-parent
+org.apache.drill.contrib
+1.12.0-SNAPSHOT
+  
+
+  drill-storage-kafka
+  contrib/kafka-storage-plugin
+
+  
+UTF-8
--- End diff --

Taken care


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379209
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+drill-contrib-parent
+org.apache.drill.contrib
+1.12.0-SNAPSHOT
+  
+
+  drill-storage-kafka
+  contrib/kafka-storage-plugin
+
+  
+UTF-8
+0.11.0.1
+**/KafkaTestSuit.class
--- End diff --

Taken care


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150379252
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
--- End diff --

Added comment


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-11 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150376627
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+drill-contrib-parent
+org.apache.drill.contrib
+1.12.0-SNAPSHOT
+  
+
+  drill-storage-kafka
+  contrib/kafka-storage-plugin
+
+  
+UTF-8
+0.11.0.1
+**/KafkaTestSuit.class
+  
+
+  
+
+  
+org.apache.maven.plugins
+maven-surefire-plugin
+
+  
+${kafka.TestSuite}
+  
+  
+**/TestKafkaQueries.java
+  
+  
+
+  logback.log.dir
+  ${project.build.directory}/surefire-reports
+
+  
+
+  
+
+  
+
+  
+
+  org.apache.drill.exec
+  drill-java-exec
+  ${project.version}
+  
+
--- End diff --

This is taken care


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-10 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150376586
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedZKQuorum.java
 ---
@@ -0,0 +1,83 @@
+/**
--- End diff --

This is taken care.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-10 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150376174
  
--- Diff: contrib/storage-kafka/README.md ---
@@ -0,0 +1,230 @@
+# Drill Kafka Plugin
+
+Drill kafka storage plugin allows you to perform interactive analysis 
using SQL against Apache Kafka.
+
+Supported Kafka Version
+Kafka-0.10 and above 
+
+Message Formats
+Currently this plugin supports reading only Kafka messages of type 
JSON.
+
+
+Message Readers
+Message Readers are used for reading messages from Kafka. Type of the 
MessageReaders supported as of now are
+
+
+  
+MessageReader
+Description
+Key DeSerializer 
+Value DeSerializer
+  
+  
+JsonMessageReader
+To read Json messages
+org.apache.kafka.common.serialization.ByteArrayDeserializer 
+org.apache.kafka.common.serialization.ByteArrayDeserializer
+  
+
+
+
+Plugin Configurations
+Drill Kafka plugin supports following properties
+
+   kafkaConsumerProps: These are typical https://kafka.apache.org/documentation/#consumerconfigs;>Kafka consumer 
properties.
+drillKafkaProps: These are Drill Kafka plugin 
properties. As of now, it supports the following properties
+   
+drill.kafka.message.reader: Message Reader 
implementation to use while reading messages from Kafka. Message reader 
implementaion should be configured based on message format. Type of message 
readers
+ 
+ org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
+ 
+
+drill.kafka.poll.timeout: Polling timeout used by 
Kafka client while fetching messages from Kafka cluster.
+
+
+
+
+Plugin Registration
+To register the kafka plugin, open the drill web interface. To open the 
drill web interface, enter http://drillbit:8047/storage in 
your browser.
+
+The following is an example plugin registration configuration
+
+{
+  "type": "kafka",
+  "kafkaConsumerProps": {
+"key.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+"auto.offset.reset": "earliest",
+"bootstrap.servers": "localhost:9092",
+"enable.auto.commit": "true",
+"group.id": "drill-query-consumer-1",
+"value.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+"session.timeout.ms": "3"
+  },
+  "drillKafkaProps": {
+"drill.kafka.message.reader": 
"org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
+"drill.kafka.poll.timeout": "2000"
+  },
+  "enabled": true
+}
+
+
+ Abstraction 
+In Drill, each Kafka topic is mapped to a SQL table and when a query is 
issued on a table, it scans all the messages from the earliest offset to the 
latest offset of that topic at that point of time. This plugin automatically 
discovers all the topics (tables), to allow you perform analysis without 
executing DDL statements.
--- End diff --

This is very valid point Paul. Only issue is, in other storage plugins like 
Mongo, we are able to push down these predicates as filters to storage. Since 
they support predicate push down.

But in case of Kafka, we cannot push down these filters. So to achieve this 
we can create specific  KafkaSubScanSpec for the query range by parsing 
predicates on kafkaMsgOffset.  But this needs some time for developing and 
testing. Is that OK, If we create separate JIRA for this and release in next 
version?


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150087815
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class KafkaScanBatchCreator implements BatchCreator {
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(FragmentContext context, 
KafkaSubScan subScan, List children)
+  throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+List readers = Lists.newArrayList();
+List columns = null;
+for (KafkaSubScan.KafkaSubScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
+  try {
+if ((columns = subScan.getCoulmns()) == null) {
+  columns = GroupScan.ALL_COLUMNS;
+}
--- End diff --

When will the columns be null? Not sure this is a valid state. However, as 
noted above, an empty list is a valid state (used for `COUNT(*)` queries.)


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150087650
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class KafkaScanBatchCreator implements BatchCreator {
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(FragmentContext context, 
KafkaSubScan subScan, List children)
+  throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+List readers = Lists.newArrayList();
+List columns = null;
+for (KafkaSubScan.KafkaSubScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
+  try {
+if ((columns = subScan.getCoulmns()) == null) {
+  columns = GroupScan.ALL_COLUMNS;
+}
--- End diff --

The column list can be shared by all readers, and so can be created outside 
of the loop over scan specs.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150087581
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class KafkaScanBatchCreator implements BatchCreator {
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(FragmentContext context, 
KafkaSubScan subScan, List children)
+  throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+List readers = Lists.newArrayList();
+List columns = null;
+for (KafkaSubScan.KafkaSubScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
+  try {
+if ((columns = subScan.getCoulmns()) == null) {
+  columns = GroupScan.ALL_COLUMNS;
+}
--- End diff --

`getCoulmns()` --> `getColumns()`


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150088237
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaScanBatchCreator.java
 ---
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.List;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.ScanBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.RecordReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class KafkaScanBatchCreator implements BatchCreator {
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaScanBatchCreator.class);
+
+  @Override
+  public CloseableRecordBatch getBatch(FragmentContext context, 
KafkaSubScan subScan, List children)
+  throws ExecutionSetupException {
+Preconditions.checkArgument(children.isEmpty());
+List readers = Lists.newArrayList();
+List columns = null;
+for (KafkaSubScan.KafkaSubScanSpec scanSpec : 
subScan.getPartitionSubScanSpecList()) {
+  try {
+if ((columns = subScan.getCoulmns()) == null) {
+  columns = GroupScan.ALL_COLUMNS;
+}
+readers.add(new KafkaRecordReader(scanSpec, columns, context, 
subScan.getKafkaStoragePlugin()));
+  } catch (Exception e) {
+logger.error("KafkaRecordReader creation failed for subScan:  " + 
subScan + ".",e);
--- End diff --

Here we are catching all errors and putting a generic messages into the 
log, and sending a generic exception up the stack. Better is to use a 
`UserException` at the actual point of failure so we can tell the user exactly 
what is wrong. Then, here, have a `catch` block for `UserException` that simply 
rethrows, while handling all other exceptions as is done here.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150086292
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150084784
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150081972
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
--- End diff --

Preparing columns is really quite difficult with many cases to handle. 
Drill appears to allow projection of the form `a.b`, `a.c` which means that `a` 
is a map and we wish to project just `b` and `c` from `a`.

As it turns out, there is an active project to replace the ad-hoc 
projection logic in each reader with a common, complete implementation. That 
work may be ready by Drill 1.13, so whatever is done here is temporary.

Finally, the code checks for 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150084335
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150086335
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150087367
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150082711
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150083767
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
+  private KafkaSubScanSpec subScanSpec;
+  private long kafkaPollTimeOut;
+  private long endOffset;
+
+  private long currentOffset;
+  private long totalFetchTime = 0;
+
+  private List partitions;
+  private final boolean enableAllTextMode;
+  private final boolean readNumbersAsDouble;
+
+  private Iterator> messageIter;
+
+  public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, 
List projectedColumns,
+  FragmentContext context, KafkaStoragePlugin plugin) {
+setColumns(projectedColumns);
+this.enableAllTextMode = 
context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val;
+this.readNumbersAsDouble = context.getOptions()
+
.getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val;
+this.unionEnabled = 
context.getOptions().getOption(ExecConstants.ENABLE_UNION_TYPE);
+this.plugin = plugin;
+this.subScanSpec = subScanSpec;
+this.endOffset = subScanSpec.getEndOffset();
+this.kafkaPollTimeOut = 
Long.valueOf(plugin.getConfig().getDrillKafkaProps().getProperty(DRILL_KAFKA_POLL_TIMEOUT));
+  }
+
+  @Override
+  protected Collection transformColumns(Collection 
projectedColumns) {
+Set transformed = Sets.newLinkedHashSet();
+if (!isStarQuery()) {
+  for (SchemaPath column : projectedColumns) {
+transformed.add(column);
+  }
+} else {
+  transformed.add(Utilities.STAR_COLUMN);
+}
+return transformed;
+  }
+
+  @Override
+  public void setup(OperatorContext context, OutputMutator output) throws 
ExecutionSetupException {
+this.writer = new VectorContainerWriter(output, 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150028303
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+drill-contrib-parent
+org.apache.drill.contrib
+1.12.0-SNAPSHOT
+  
+
+  drill-storage-kafka
+  contrib/kafka-storage-plugin
+
+  
+UTF-8
+0.11.0.1
+**/KafkaTestSuit.class
--- End diff --

What is the reason to define `kafka.TestSuite` property?


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150029170
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+drill-contrib-parent
+org.apache.drill.contrib
+1.12.0-SNAPSHOT
+  
+
+  drill-storage-kafka
+  contrib/kafka-storage-plugin
+
+  
+UTF-8
+0.11.0.1
+**/KafkaTestSuit.class
+  
+
+  
+
+  
+org.apache.maven.plugins
+maven-surefire-plugin
+
--- End diff --

It will be better to go with the default `maven-surefire-plugin` 
configuration unless there is a good justification to use custom config. Most 
of the time this can be achieved by using default test name convention.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150030044
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+drill-contrib-parent
+org.apache.drill.contrib
+1.12.0-SNAPSHOT
+  
+
+  drill-storage-kafka
+  contrib/kafka-storage-plugin
+
+  
+UTF-8
+0.11.0.1
+**/KafkaTestSuit.class
+  
+
+  
+
+  
+org.apache.maven.plugins
+maven-surefire-plugin
+
+  
+${kafka.TestSuite}
+  
+  
+**/TestKafkaQueries.java
+  
+  
+
+  logback.log.dir
+  ${project.build.directory}/surefire-reports
+
+  
+
+  
+
+  
+
+  
+
+  org.apache.drill.exec
+  drill-java-exec
+  ${project.version}
+  
+
--- End diff --

Why is it necessary to exclude zookeeper? If a specific version of 
zookeeper is required, will it be better to explicitly add zookeeper to the 
dependency management?


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread vrozov
Github user vrozov commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150019576
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+  xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+  4.0.0
+
+  
+drill-contrib-parent
+org.apache.drill.contrib
+1.12.0-SNAPSHOT
+  
+
+  drill-storage-kafka
+  contrib/kafka-storage-plugin
+
+  
+UTF-8
--- End diff --

If the setting is necessary, it will be better to set it at the root pom.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread akumarb2010
Github user akumarb2010 commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r150002516
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/DrillKafkaConfig.java
 ---
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public class DrillKafkaConfig {
+
+  /**
+   * Timeout for fetching messages from Kafka
--- End diff --

Thanks Paul, this is very good point and it perfectly make sense to add 
them as Drill session options instead of Drill config properties. We are 
working on these changes.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149893057
  
--- Diff: contrib/storage-kafka/src/test/resources/logback-test.xml ---
@@ -0,0 +1,51 @@
+
--- End diff --

Please remove. Now we have common logging configuration for all in 
drill-common module.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149893459
  
--- Diff: 
contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/cluster/EmbeddedZKQuorum.java
 ---
@@ -0,0 +1,83 @@
+/**
--- End diff --

Apache header should be in a form of comment, not Java doc. Please update 
here and in other newly added files.
Hope, somebody will add to check-style so we won't have to remind about it 
all the time.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-09 Thread arina-ielchiieva
Github user arina-ielchiieva commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149893582
  
--- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
 ---
@@ -343,4 +343,4 @@ public void close() {
   }
 }
   }
-}
+}
--- End diff --

Please revert changes in this file.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149763028
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
+
+  private KafkaStoragePlugin kafkaStoragePlugin;
+  private KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = kafkaScanSpec;
+init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List columns,
 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149763319
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
+
+  private KafkaStoragePlugin kafkaStoragePlugin;
+  private KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = kafkaScanSpec;
+init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List columns,
 

[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149757320
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/DrillKafkaConfig.java
 ---
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public class DrillKafkaConfig {
--- End diff --

Members are constants. Make this an interface?


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149756110
  
--- Diff: contrib/storage-kafka/README.md ---
@@ -0,0 +1,230 @@
+# Drill Kafka Plugin
+
+Drill kafka storage plugin allows you to perform interactive analysis 
using SQL against Apache Kafka.
+
+Supported Kafka Version
+Kafka-0.10 and above 
+
+Message Formats
+Currently this plugin supports reading only Kafka messages of type 
JSON.
+
+
+Message Readers
+Message Readers are used for reading messages from Kafka. Type of the 
MessageReaders supported as of now are
+
+
+  
+MessageReader
+Description
+Key DeSerializer 
+Value DeSerializer
+  
+  
+JsonMessageReader
+To read Json messages
+org.apache.kafka.common.serialization.ByteArrayDeserializer 
+org.apache.kafka.common.serialization.ByteArrayDeserializer
+  
+
+
+
+Plugin Configurations
+Drill Kafka plugin supports following properties
+
+   kafkaConsumerProps: These are typical https://kafka.apache.org/documentation/#consumerconfigs;>Kafka consumer 
properties.
+drillKafkaProps: These are Drill Kafka plugin 
properties. As of now, it supports the following properties
+   
+drill.kafka.message.reader: Message Reader 
implementation to use while reading messages from Kafka. Message reader 
implementaion should be configured based on message format. Type of message 
readers
+ 
+ org.apache.drill.exec.store.kafka.decoders.JsonMessageReader
+ 
+
+drill.kafka.poll.timeout: Polling timeout used by 
Kafka client while fetching messages from Kafka cluster.
+
+
+
+
+Plugin Registration
+To register the kafka plugin, open the drill web interface. To open the 
drill web interface, enter http://drillbit:8047/storage in 
your browser.
+
+The following is an example plugin registration configuration
+
+{
+  "type": "kafka",
+  "kafkaConsumerProps": {
+"key.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+"auto.offset.reset": "earliest",
+"bootstrap.servers": "localhost:9092",
+"enable.auto.commit": "true",
+"group.id": "drill-query-consumer-1",
+"value.deserializer": 
"org.apache.kafka.common.serialization.ByteArrayDeserializer",
+"session.timeout.ms": "3"
+  },
+  "drillKafkaProps": {
+"drill.kafka.message.reader": 
"org.apache.drill.exec.store.kafka.decoders.JsonMessageReader",
+"drill.kafka.poll.timeout": "2000"
+  },
+  "enabled": true
+}
+
+
+ Abstraction 
+In Drill, each Kafka topic is mapped to a SQL table and when a query is 
issued on a table, it scans all the messages from the earliest offset to the 
latest offset of that topic at that point of time. This plugin automatically 
discovers all the topics (tables), to allow you perform analysis without 
executing DDL statements.
--- End diff --

Does it make sense to provide a way to select a range of messages: a 
starting point or a count? Perhaps I want to run my query every five minutes, 
scanning only those messages since the previous scan. Or, I want to limit my 
take to, say, the next 1000 messages. Could we use a pseudo-column such as 
"kafkaMsgOffset" for that purpose? Maybe

```
SELECT * FROM  WHERE kafkaMsgOffset > 12345
```


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149757092
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
--- End diff --

@vrozov, you are looking into Drill's Maven files. Can you review this new 
one as well?


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149757481
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/DrillKafkaConfig.java
 ---
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+public class DrillKafkaConfig {
+
+  /**
+   * Timeout for fetching messages from Kafka
--- End diff --

Maybe clarify if this is a Drill config property, or a Drill system/session 
option.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149756771
  
--- Diff: contrib/storage-kafka/pom.xml ---
@@ -0,0 +1,130 @@
+
+
+http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;
+   xmlns="http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance;>
+   4.0.0
+
+   
+   drill-contrib-parent
+   org.apache.drill.contrib
+   1.12.0-SNAPSHOT
+   
+
+   drill-storage-kafka
+   contrib/kafka-storage-plugin
+
+   
+   
UTF-8
--- End diff --

Small nit: Drill prefers an indent of 2, even in the `pom.xml` file.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149760078
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
--- End diff --

Please add a comment to explain that this is the assumed message size for 
purposes of computing data size. (At least, that's how it appears to be used 
below...)


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149770638
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaRecordReader.class);
+  public static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
+
+  private VectorContainerWriter writer;
+  private MessageReader messageReader;
+
+  private boolean unionEnabled;
+  private KafkaConsumer kafkaConsumer;
+  private KafkaStoragePlugin plugin;
--- End diff --

`final`? And for others assigned in the constructor?


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149770390
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
 ---
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import static 
org.apache.drill.exec.store.kafka.DrillKafkaConfig.DRILL_KAFKA_POLL_TIMEOUT;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+public class KafkaRecordReader extends AbstractRecordReader {
--- End diff --

Minor point: maybe add a blank line or two between imports and class to 
improve readability.


---


[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin

2017-11-08 Thread paul-rogers
Github user paul-rogers commented on a diff in the pull request:

https://github.com/apache/drill/pull/1027#discussion_r149758022
  
--- Diff: 
contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaGroupScan.java
 ---
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.kafka;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.EndpointAffinity;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.ScanStats;
+import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec;
+import org.apache.drill.exec.store.kafka.decoders.MessageReader;
+import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
+import org.apache.drill.exec.store.schedule.AffinityCreator;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+@JsonTypeName("kafka-scan")
+public class KafkaGroupScan extends AbstractGroupScan {
+
+  static final Logger logger = 
LoggerFactory.getLogger(KafkaGroupScan.class);
+  private static final long MSG_SIZE = 1024;
+
+  private KafkaStoragePlugin kafkaStoragePlugin;
+  private KafkaStoragePluginConfig kafkaStoragePluginConfig;
+  private List columns;
+  private KafkaScanSpec kafkaScanSpec;
+
+  private List partitionWorkList;
+  private ListMultimap assignments;
+  private List affinities;
+
+  @JsonCreator
+  public KafkaGroupScan(@JsonProperty("userName") String userName,
+  @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig 
kafkaStoragePluginConfig,
+  @JsonProperty("columns") List columns, 
@JsonProperty("scanSpec") KafkaScanSpec scanSpec,
+  @JacksonInject StoragePluginRegistry pluginRegistry) {
+this(userName, kafkaStoragePluginConfig, columns, scanSpec, 
(KafkaStoragePlugin) pluginRegistry);
+  }
+
+  public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, 
KafkaScanSpec kafkaScanSpec, List columns) {
+super(StringUtils.EMPTY);
+this.kafkaStoragePlugin = kafkaStoragePlugin;
+this.kafkaStoragePluginConfig = (KafkaStoragePluginConfig) 
kafkaStoragePlugin.getConfig();
+this.columns = columns;
+this.kafkaScanSpec = kafkaScanSpec;
+init();
+  }
+
+  public KafkaGroupScan(String userName, KafkaStoragePluginConfig 
kafkaStoragePluginConfig, List columns,