[jira] [Created] (DRILL-5984) Support for Symlinked Table Paths to be used in Drill Queries.
Saravanabavagugan Vengadasundaram created DRILL-5984: Summary: Support for Symlinked Table Paths to be used in Drill Queries. Key: DRILL-5984 URL: https://issues.apache.org/jira/browse/DRILL-5984 Project: Apache Drill Issue Type: New Feature Affects Versions: 1.11.0 Environment: OS : CentOS 7.1 MapR-DB Version: 5.2.2 Reporter: Saravanabavagugan Vengadasundaram MapR-FS supports symlinks and hence MapR-DB table paths support symlinks as well. As part of the project I work on, we use symlinks as a means of communication to talk to the physical file. An employee table in MapR-DB will be represented as "/tables/Employee/Entity_1233232" and there will be a symlink called "/tables/Employee/Entity" pointing to the actual physical table. Currently, drill does not understand queries having the symlink path but only executes queries having the actual physical table path. So every time, I need to find out the actual physical path of the table and frame my query. It would be nice to have this feature in next version of Drill. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] drill issue #1043: DRILL-5981: Add Syntax Highlighting and Error Checking to...
Github user kkhatua commented on the issue: https://github.com/apache/drill/pull/1043 Oh.. just for a heads up (I guess you already knew this), I zeroed in on the 2 themes based on copy-pasting a sample storage plugin into this: https://ace.c9.io/build/kitchen-sink.html ---
[GitHub] drill issue #1043: DRILL-5981: Add Syntax Highlighting and Error Checking to...
Github user kkhatua commented on the issue: https://github.com/apache/drill/pull/1043 Very nice, @cgivre ! I've been looking to do something similar for the SQL side as well, but with autocomplete support. Let's talk about that separately. We just need to make sure that there are no license/usage issues. Would be nice if we can leverage this into the SQL Editor as well. Let's talk over this offline. I know I had a discussion recently with someone regarding validation of the JSON for the storage plugin, but that will be a stretch. Also, it seems like the library can recognize single line comments ( // ), which (I believe) is supported by Drill JSON config parser. Can you pick a theme that helps visibly pop out the colors more than it currently is? Crimson or Eclipse themes look better, helping visualize. Also, if the 'src-min-noconflict' is primarily to support the ace libraries and you don't want to risk renaming the library files, it's good to give it a more meaningful name (indicating that it contains AceJS libraries). Otherwise, LGTM +1 ---
[GitHub] drill issue #1045: DRILL-5730 Test Mocking Improvements
Github user ilooner commented on the issue: https://github.com/apache/drill/pull/1045 @paul-rogers ---
[GitHub] drill pull request #1045: DRILL-5730 Test Mocking Improvements
GitHub user ilooner opened a pull request: https://github.com/apache/drill/pull/1045 DRILL-5730 Test Mocking Improvements ## DRILL-5730 - Switched to using the interface for FragmentContext everywhere instead of passing around the concrete class. - Minor refactoring of FragmentContext public methods - Switched to using the OptionSet interface throughout the codebase instead of OptionManager - Renamed **FragmentContext** to **FragmentContextImpl** and renamed **FragmentContextInterface** to **FragmentContext**. - Removed JMockit from most unit tests in favor of Mockito. Unfortunately it cannot be removed some some of the unit tests which depend on it for mocking private method and static methods (which is functionality only JMockit provides). In the future we need to refactor the code so that these remaining tests can have JMockit removed completely. - Refactored some tests to use a mock class of FragmentContext - Some tests were using Mockito and JMockit when there was no need for a mocking framework ## Misc - Removed commented out code and unused imports - Removed unnecessary modifiers from methods in interfaces - Fixed a bug in bootstrapcontext which leaked threads - Fixed javadoc links that were broken You can merge this pull request into a Git repository by running: $ git pull https://github.com/ilooner/drill DRILL-5730 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1045.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1045 commit b4b4de83db5df20f2fa56387f5756df0ead3ec17 Author: Paul RogersDate: 2017-10-05T05:43:44Z DRILL-5842: Refactor fragment, operator contexts commit 0a2d938cee7d5d47d3ac0d666ace8163efb3af83 Author: Paul Rogers Date: 2017-10-06T06:24:56Z Fixes for tests which mock contexts commit 34cd7494c68f0934fdf5f455748863be873b3995 Author: Timothy Farkas Date: 2017-10-16T18:28:54Z - Removed commented out code - Removed redundant modifiers on interface methods commit a4944b20abe226a990adc775a3641b44c0b173bb Author: Timothy Farkas Date: 2017-10-16T19:36:23Z - Some more minor code cleanup commit 13f35109a30f03414223c84f4f4fb664ab344e6e Author: Timothy Farkas Date: 2017-10-17T19:30:59Z - Deleted commented out code - Removed unused variables - Replaced usage of FragmentContext with FragmentContextInterface - Refactored OptionSet and FragmentContextInterface interfaces commit 629da8ff3bd40b3269747cf54a88754da3266346 Author: Timothy Farkas Date: 2017-10-18T19:37:37Z - More changes to the FragmentContextInterface - Replaced more usages of FragmentContext with FragmentContextInterface - Replaced usages of OptionManager with OptionSet commit 71f9a1c7d2c8b2f60398348d57344c56a68f556c Author: Timothy Farkas Date: 2017-10-18T19:52:01Z - Removed unused import commit b189350a20e3527d8b6c7df82fdb8641a359dad4 Author: Timothy Farkas Date: 2017-10-18T22:21:52Z - Fixed broken unit tests commit 27f88376c7ad5da384570de0a3eafeb16393829d Author: Timothy Farkas Date: 2017-11-07T19:02:43Z - Deleted unused fields commit 5f3e3ce93aba98e2e20abd0a187392d38a78c374 Author: Timothy Farkas Date: 2017-11-09T02:48:44Z - Removed unused variables - Removed use of Jmockit from unit tests - Minor code cleanup commit df4b0c1fed0f2d34292e6e635635cee4c6f2f2af Author: Timothy Farkas Date: 2017-11-09T21:42:00Z - Fixed java-exec build and test errors commit 8113edb320f2ff12e26bd87f82b23fd47a9513cd Author: Timothy Farkas Date: 2017-11-16T00:46:34Z - Fixed broken test - Removed broken TestOptiqPlans - Completed replacing references to FragmentContext with FragmentContextInterface commit b1fee4ff6e5c6dde14239732baae193ea752f21e Author: Timothy Farkas Date: 2017-11-16T00:58:45Z - Moved TestHashJoin off of JMockit commit f94a115eddfbbae8db861e6e97481197c9100f6c Author: Timothy Farkas Date: 2017-11-16T20:52:03Z - Removed more dependencies on JMockit commit d2178e262885f073cbb0d3daab1640a36f505805 Author: Timothy Farkas Date: 2017-11-20T20:28:29Z - Finished migrating most of the tests off of JMockit - Tests pass commit 58ca0d1d42b931509dcff013346f2374556d5080 Author: Timothy Farkas Date: 2017-11-20T23:41:41Z - Fixed a test bug commit
[GitHub] drill issue #1030: DRILL-5941: Skip header / footer improvements for Hive st...
Github user ppadma commented on the issue: https://github.com/apache/drill/pull/1030 +1. LGTM. ---
[GitHub] drill pull request #1044: DRILL-5980: Making queryType param for REST client...
GitHub user ChrisSandison opened a pull request: https://github.com/apache/drill/pull/1044 DRILL-5980: Making queryType param for REST client case-insensitive Fixes #5980 You can merge this pull request into a Git repository by running: $ git pull https://github.com/ChrisSandison/drill DRILL-5980 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/drill/pull/1044.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1044 commit 4ee5f80839ef9afdb1ba4132e23b14013a3330d4 Author: chrisDate: 2017-11-21T20:53:45Z DRILL-5980: Making queryType param for REST client case-insensitive Fixes #5980 ---
[GitHub] drill pull request #795: DRILL-5089: Get only partial schemas of relevant st...
Github user chunhui-shi closed the pull request at: https://github.com/apache/drill/pull/795 ---
[GitHub] drill issue #914: DRILL-5657: Size-aware vector writer structure
Github user paul-rogers commented on the issue: https://github.com/apache/drill/pull/914 Thanks, Parth! Will make another commit to address small issues that Karthik pointed out. Let's hold off the actual commit until Drill 1.12 ships. I'll then commit the changes when we open things up again for Drill 1.13 changes. ---
[GitHub] drill pull request #921: DRILL-4286 Graceful shutdown of drillbit
Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/921#discussion_r152361270 --- Diff: protocol/src/main/java/org/apache/drill/exec/proto/beans/RpcType.java --- @@ -25,28 +25,8 @@ HANDSHAKE(0), ACK(1), GOODBYE(2), -RUN_QUERY(3), -CANCEL_QUERY(4), -REQUEST_RESULTS(5), -RESUME_PAUSED_QUERY(11), -GET_QUERY_PLAN_FRAGMENTS(12), -GET_CATALOGS(14), -GET_SCHEMAS(15), -GET_TABLES(16), -GET_COLUMNS(17), -CREATE_PREPARED_STATEMENT(22), -GET_SERVER_META(8), -QUERY_DATA(6), -QUERY_HANDLE(7), -QUERY_PLAN_FRAGMENTS(13), -CATALOGS(18), -SCHEMAS(19), -TABLES(20), -COLUMNS(21), -PREPARED_STATEMENT(23), -SERVER_META(9), -QUERY_RESULT(10), -SASL_MESSAGE(24); +REQ_RECORD_BATCH(3), +SASL_MESSAGE(4); --- End diff -- The change seems to be that messages are dropped. That can't be good. The only diff that should show up here is the addition of your new state codes. The other explanation is that master is wrong, which would be a bad state of affairs. ---
[GitHub] drill issue #1033: DRILL-5952: Implement "CREATE TABLE IF NOT EXISTS"
Github user prasadns14 commented on the issue: https://github.com/apache/drill/pull/1033 @arina-ielchiieva, please review ---
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user kameshb commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152310859 --- Diff: contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/TestKafkaSuit.java --- @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.ZkConnection; +import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; +import org.junit.runners.Suite.SuiteClasses; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Joiner; + +import kafka.admin.AdminUtils; +import kafka.admin.RackAwareMode; +import kafka.utils.ZKStringSerializer$; +import kafka.utils.ZkUtils; + +@RunWith(Suite.class) +@SuiteClasses({ KafkaQueriesTest.class, MessageIteratorTest.class }) +public class TestKafkaSuit { --- End diff -- Thanks for suggesting this frameworks. I think this may take considerable amount of time and since release date is very close, we will address this refactoring by next release. ---
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user kameshb commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152303056 --- Diff: contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.decoders; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType; +import org.junit.Assert; +import org.junit.Test; + +public class MessageReaderFactoryTest { + + @Test + public void testShouldThrowExceptionAsMessageReaderIsNull() { --- End diff -- taken care ---
[GitHub] drill issue #1036: DRILL-5962: Adding ST_AsJSON functionality
Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/1036 Please update commit message. +1, LGTM. ---
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152286833 --- Diff: contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/decoders/MessageReaderFactoryTest.java --- @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.decoders; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType; +import org.junit.Assert; +import org.junit.Test; + +public class MessageReaderFactoryTest { + + @Test + public void testShouldThrowExceptionAsMessageReaderIsNull() { --- End diff -- Please check the below tests, they can give false positive result as well. ---
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user kameshb commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152285907 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec; +import org.apache.drill.exec.store.kafka.decoders.MessageReader; +import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory; +import org.apache.drill.exec.util.Utilities; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class KafkaRecordReader extends AbstractRecordReader { + private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class); + public static final long DEFAULT_MESSAGES_PER_BATCH = 4000; + + private VectorContainerWriter writer; + private MessageReader messageReader; + + private final boolean unionEnabled; + private final KafkaStoragePlugin plugin; + private final KafkaSubScanSpec subScanSpec; + private final long kafkaPollTimeOut; + + private long currentOffset; + private MessageIterator msgItr; + + private final boolean enableAllTextMode; + private final boolean readNumbersAsDouble; + private final String kafkaMsgReader; + + public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List projectedColumns, + FragmentContext context, KafkaStoragePlugin plugin) { +setColumns(projectedColumns); +this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val; +this.readNumbersAsDouble = context.getOptions() + .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val; +OptionManager options = context.getOptions(); +this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE); +this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val; +this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val; +this.plugin = plugin; +this.subScanSpec = subScanSpec; + } + + @Override + protected Collection transformColumns(Collection projectedColumns) { +Set transformed = Sets.newLinkedHashSet(); +if (!isStarQuery()) { + for (SchemaPath column : projectedColumns) { +transformed.add(column); + } +} else { + transformed.add(Utilities.STAR_COLUMN); +} +return transformed; + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { +this.writer = new VectorContainerWriter(output, unionEnabled); +messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader); +messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), this.writer, +this.enableAllTextMode, this.readNumbersAsDouble); +
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152248126 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/schema/KafkaMessageSchema.java --- @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka.schema; + +import java.util.Map; +import java.util.Set; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DynamicDrillTable; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.kafka.KafkaScanSpec; +import org.apache.drill.exec.store.kafka.KafkaStoragePlugin; +import org.apache.drill.exec.store.kafka.KafkaStoragePluginConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; + +public class KafkaMessageSchema extends AbstractSchema { + + private static final Logger logger = LoggerFactory.getLogger(KafkaMessageSchema.class); + private final KafkaStoragePlugin plugin; + private final MapdrillTables = Maps.newHashMap(); + private Set tableNames; + + public KafkaMessageSchema(final KafkaStoragePlugin plugin, final String name) { +super(ImmutableList. of(), name); +this.plugin = plugin; + } + + @Override + public String getTypeName() { +return KafkaStoragePluginConfig.NAME; + } + + void setHolder(SchemaPlus plusOfThis) { +for (String s : getSubSchemaNames()) { + plusOfThis.add(s, getSubSchema(s)); +} + } + + @Override + public Table getTable(String tableName) { +if (!drillTables.containsKey(tableName)) { + KafkaScanSpec scanSpec = new KafkaScanSpec(tableName); + DrillTable table = new DynamicDrillTable(plugin, getName(), scanSpec); + drillTables.put(tableName, table); +} + +return drillTables.get(tableName); + } + + @Override + public Set getTableNames() { +if (tableNames == null) { + try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(plugin.getConfig().getKafkaConsumerProps())) { +tableNames = kafkaConsumer.listTopics().keySet(); + } catch(KafkaException e) { +logger.error(e.getMessage(), e); +throw UserException.dataReadError(e).message("Failed to get tables information").addContext(e.getMessage()) --- End diff -- If I am not mistaken, `UserException` logs exception internally. Please checks the logs to verify if using `logger` and `UserException` is not duplicating exception. ---
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152246552 --- Diff: contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/MessageIteratorTest.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.NoSuchElementException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType; +import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class MessageIteratorTest extends KafkaTestBase { + + private KafkaConsumerkafkaConsumer; + private KafkaSubScanSpec subScanSpec; + + @Before + public void setUp() { +Properties consumerProps = storagePluginConfig.getKafkaConsumerProps(); +consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); +consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); +consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "4"); +kafkaConsumer = new KafkaConsumer<>(consumerProps); +subScanSpec = new KafkaSubScanSpec(QueryConstants.JSON_TOPIC, 0, 0, TestKafkaSuit.NUM_JSON_MSG); + } + + @After + public void cleanUp() { +if (kafkaConsumer != null) { + kafkaConsumer.close(); +} + } + + @Test + public void testWhenPollTimeOutIsTooLess() { +MessageIterator iterator = new MessageIterator(kafkaConsumer, subScanSpec, 1); +try { + iterator.hasNext(); +} catch (UserException ue) { --- End diff -- Test cane give false positive result if exception won't be thrown at all. Please re-throw the exception in catch block and update test annotation`@Test(expected = UserException.class` ---
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152248647 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.store.AbstractRecordReader; +import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec; +import org.apache.drill.exec.store.kafka.decoders.MessageReader; +import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory; +import org.apache.drill.exec.util.Utilities; +import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class KafkaRecordReader extends AbstractRecordReader { + private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class); + public static final long DEFAULT_MESSAGES_PER_BATCH = 4000; + + private VectorContainerWriter writer; + private MessageReader messageReader; + + private final boolean unionEnabled; + private final KafkaStoragePlugin plugin; + private final KafkaSubScanSpec subScanSpec; + private final long kafkaPollTimeOut; + + private long currentOffset; + private MessageIterator msgItr; + + private final boolean enableAllTextMode; + private final boolean readNumbersAsDouble; + private final String kafkaMsgReader; + + public KafkaRecordReader(KafkaSubScan.KafkaSubScanSpec subScanSpec, List projectedColumns, + FragmentContext context, KafkaStoragePlugin plugin) { +setColumns(projectedColumns); +this.enableAllTextMode = context.getOptions().getOption(ExecConstants.KAFKA_ALL_TEXT_MODE).bool_val; +this.readNumbersAsDouble = context.getOptions() + .getOption(ExecConstants.KAFKA_READER_READ_NUMBERS_AS_DOUBLE).bool_val; +OptionManager options = context.getOptions(); +this.unionEnabled = options.getOption(ExecConstants.ENABLE_UNION_TYPE); +this.kafkaMsgReader = options.getOption(ExecConstants.KAFKA_RECORD_READER).string_val; +this.kafkaPollTimeOut = options.getOption(ExecConstants.KAFKA_POLL_TIMEOUT).num_val; +this.plugin = plugin; +this.subScanSpec = subScanSpec; + } + + @Override + protected Collection transformColumns(Collection projectedColumns) { +Set transformed = Sets.newLinkedHashSet(); +if (!isStarQuery()) { + for (SchemaPath column : projectedColumns) { +transformed.add(column); + } +} else { + transformed.add(Utilities.STAR_COLUMN); +} +return transformed; + } + + @Override + public void setup(OperatorContext context, OutputMutator output) throws ExecutionSetupException { +this.writer = new VectorContainerWriter(output, unionEnabled); +messageReader = MessageReaderFactory.getMessageReader(kafkaMsgReader); +messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), this.writer, +this.enableAllTextMode, this.readNumbersAsDouble);
[GitHub] drill pull request #1027: DRILL-4779 : Kafka storage plugin
Github user arina-ielchiieva commented on a diff in the pull request: https://github.com/apache/drill/pull/1027#discussion_r152245838 --- Diff: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/MessageIterator.java --- @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.kafka; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.store.kafka.KafkaSubScan.KafkaSubScanSpec; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; + +import kafka.common.KafkaException; + +public class MessageIterator implements Iterator> { + + private static final Logger logger = LoggerFactory.getLogger(MessageIterator.class); + private final KafkaConsumer kafkaConsumer; + private Iterator > recordIter; + private final TopicPartition topicPartition; + private long totalFetchTime = 0; + private final long kafkaPollTimeOut; + private final long endOffset; + + public MessageIterator(final KafkaConsumer kafkaConsumer, final KafkaSubScanSpec subScanSpec, + final long kafkaPollTimeOut) { +this.kafkaConsumer = kafkaConsumer; +this.kafkaPollTimeOut = kafkaPollTimeOut; + +List partitions = Lists.newArrayListWithCapacity(1); +topicPartition = new TopicPartition(subScanSpec.getTopicName(), subScanSpec.getPartitionId()); +partitions.add(topicPartition); +this.kafkaConsumer.assign(partitions); +logger.info("Start offset of {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(), +subScanSpec.getStartOffset()); +this.kafkaConsumer.seek(topicPartition, subScanSpec.getStartOffset()); +this.endOffset = subScanSpec.getEndOffset(); + } + + @Override + public void remove() { +throw new UnsupportedOperationException("Does not support remove operation"); + } + + @Override + public boolean hasNext() { +if (recordIter != null && recordIter.hasNext()) { + return true; +} + +long nextPosition = kafkaConsumer.position(topicPartition); +if (nextPosition >= endOffset) { + return false; +} + +ConsumerRecords consumerRecords = null; +Stopwatch stopwatch = Stopwatch.createStarted(); +try { + consumerRecords = kafkaConsumer.poll(kafkaPollTimeOut); +} catch (KafkaException ke) { + logger.error(ke.getMessage(), ke); + throw UserException.dataReadError(ke).message(ke.getMessage()).build(logger); +} +stopwatch.stop(); + +String errorMsg = new StringBuilder().append("Failed to fetch messages within ").append(kafkaPollTimeOut) --- End diff -- Error message can be constructed inside of if clause: `if (consumerRecords.isEmpty()) {` ---
[GitHub] drill issue #1043: DRILL-5981: Add Syntax Highlighting and Error Checking to...
Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/1043 @cgivre looks really nice, thanks for making the change! @kkhatua could you please take a look at this PR? ---
[GitHub] drill issue #1034: DRILL-5960: Adding asGeoJSON function
Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/1034 +1 ---
[GitHub] drill issue #1032: DRILL-5089: Dynamically load schema of storage plugin onl...
Github user arina-ielchiieva commented on the issue: https://github.com/apache/drill/pull/1032 +1 ---
[jira] [Created] (DRILL-5983) Unsupported nullable converted type INT_8 for primitive type INT32 error
Hakan Sarıbıyık created DRILL-5983: -- Summary: Unsupported nullable converted type INT_8 for primitive type INT32 error Key: DRILL-5983 URL: https://issues.apache.org/jira/browse/DRILL-5983 Project: Apache Drill Issue Type: Bug Components: Execution - Data Types Affects Versions: 1.11.0, 1.10.0 Environment: NAME="Ubuntu" VERSION="16.04.2 LTS (Xenial Xerus)" Reporter: Hakan Sarıbıyık When I query a table with byte in it, then it gives an error; _Query Failed: An Error Occurred org.apache.drill.common.exceptions.UserRemoteException: SYSTEM ERROR: ExecutionSetupException: Unsupported nullable converted type INT_8 for primitive type INT32 Fragment 1:6 [Error Id: 46636b05-cff5-455b-ba25-527217346b3e on bigdata7:31010]_ Actualy, it has been solved with [DRILL-4764] - Parquet file with INT_16, etc. logical types not supported by simple SELECT according to https://drill.apache.org/docs/apache-drill-1-10-0-release-notes/ But i tried it with even 1-11-0 it didnt worked. I am querying parquet formatted file with pySpark tablo1 sourceid: byte (nullable = true) select sourceid from tablo1 works as expected with pySpark. But not with Drill v1.11.0 Thanx. -- This message was sent by Atlassian JIRA (v6.4.14#64029)