Repository: hive Updated Branches: refs/heads/master 5daad4e44 -> cf4114e1b
HIVE-17627 : Use druid scan query instead of the select query. (Nishant Bangarwa via Slim B, Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cf4114e1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cf4114e1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cf4114e1 Branch: refs/heads/master Commit: cf4114e1b72b0637b92d4d1267ac9b779d48a29a Parents: 5daad4e Author: Nishant Bangarwa <nishant.mon...@gmail.com> Authored: Tue Jan 23 11:08:00 2018 -0800 Committer: Ashutosh Chauhan <hashut...@apache.org> Committed: Tue Feb 13 13:38:40 2018 -0800 ---------------------------------------------------------------------- .../druid/io/DruidQueryBasedInputFormat.java | 98 +++++++++++++----- .../druid/serde/DruidScanQueryRecordReader.java | 102 +++++++++++++++++++ .../hadoop/hive/druid/serde/DruidSerDe.java | 49 ++++++++- 3 files changed, 225 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cf4114e1/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java index 7bdc172..33f6412 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/io/DruidQueryBasedInputFormat.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.druid.DruidStorageHandler; import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; import org.apache.hadoop.hive.druid.serde.DruidGroupByQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidQueryRecordReader; +import org.apache.hadoop.hive.druid.serde.DruidScanQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidSelectQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidTimeseriesQueryRecordReader; import org.apache.hadoop.hive.druid.serde.DruidTopNQueryRecordReader; @@ -68,6 +69,7 @@ import io.druid.query.Druids.SelectQueryBuilder; import io.druid.query.LocatedSegmentDescriptor; import io.druid.query.Query; import io.druid.query.SegmentDescriptor; +import io.druid.query.scan.ScanQuery; import io.druid.query.select.PagingSpec; import io.druid.query.select.SelectQuery; import io.druid.query.spec.MultipleSpecificSegmentSpec; @@ -93,6 +95,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW return new DruidGroupByQueryRecordReader(); case Query.SELECT: return new DruidSelectQueryRecordReader(); + case Query.SCAN: + return new DruidScanQueryRecordReader(); } return null; } @@ -155,7 +159,11 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW SelectQuery selectQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( druidQuery, SelectQuery.class); return distributeSelectQuery(conf, address, selectQuery, paths[0]); - default: + case Query.SCAN: + ScanQuery scanQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( + druidQuery, ScanQuery.class); + return distributeScanQuery(conf, address, scanQuery, paths[0]); + default: throw new IOException("Druid query type not recognized"); } } @@ -186,28 +194,8 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW new String[]{address} ) }; } - final String intervals = - StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets - final String request = String.format( - "http://%s/druid/v2/datasources/%s/candidates?intervals=%s", - address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8")); - LOG.debug("sending request {} to query for segments", request); - final InputStream response; - try { - response = DruidStorageHandlerUtils.submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request))); - } catch (Exception e) { - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } - - // Retrieve results - final List<LocatedSegmentDescriptor> segmentDescriptors; - try { - segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response, - new TypeReference<List<LocatedSegmentDescriptor>>() {}); - } catch (Exception e) { - response.close(); - throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); - } + final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors( + address, query); // Create one input split for each segment final int numSplits = segmentDescriptors.size(); @@ -233,6 +221,70 @@ public class DruidQueryBasedInputFormat extends InputFormat<NullWritable, DruidW return splits; } + /* New method that distributes the Scan query by creating splits containing + * information about different Druid nodes that have the data for the given + * query. */ + private static HiveDruidSplit[] distributeScanQuery(Configuration conf, String address, + ScanQuery query, Path dummyPath) throws IOException { + // If it has a limit, we use it and we do not distribute the query + final boolean isFetch = query.getContextBoolean(Constants.DRUID_QUERY_FETCH, false); + if (isFetch) { + return new HiveDruidSplit[] { new HiveDruidSplit( + DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(query), dummyPath, + new String[]{address} ) }; + } + + final List<LocatedSegmentDescriptor> segmentDescriptors = fetchLocatedSegmentDescriptors( + address, query); + + // Create one input split for each segment + final int numSplits = segmentDescriptors.size(); + final HiveDruidSplit[] splits = new HiveDruidSplit[segmentDescriptors.size()]; + for (int i = 0; i < numSplits; i++) { + final LocatedSegmentDescriptor locatedSD = segmentDescriptors.get(i); + final String[] hosts = new String[locatedSD.getLocations().size()]; + for (int j = 0; j < locatedSD.getLocations().size(); j++) { + hosts[j] = locatedSD.getLocations().get(j).getHost(); + } + // Create partial Select query + final SegmentDescriptor newSD = new SegmentDescriptor( + locatedSD.getInterval(), locatedSD.getVersion(), locatedSD.getPartitionNumber()); + final Query partialQuery = query + .withQuerySegmentSpec(new MultipleSpecificSegmentSpec(Lists.newArrayList(newSD))); + splits[i] = new HiveDruidSplit(DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(partialQuery), + dummyPath, hosts); + } + return splits; + } + + private static List<LocatedSegmentDescriptor> fetchLocatedSegmentDescriptors(String address, + BaseQuery query) throws IOException { + final String intervals = + StringUtils.join(query.getIntervals(), ","); // Comma-separated intervals without brackets + final String request = String.format( + "http://%s/druid/v2/datasources/%s/candidates?intervals=%s", + address, query.getDataSource().getNames().get(0), URLEncoder.encode(intervals, "UTF-8")); + LOG.debug("sending request {} to query for segments", request); + final InputStream response; + try { + response = DruidStorageHandlerUtils + .submitRequest(DruidStorageHandler.getHttpClient(), new Request(HttpMethod.GET, new URL(request))); + } catch (Exception e) { + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + + // Retrieve results + final List<LocatedSegmentDescriptor> segmentDescriptors; + try { + segmentDescriptors = DruidStorageHandlerUtils.JSON_MAPPER.readValue(response, + new TypeReference<List<LocatedSegmentDescriptor>>() {}); + } catch (Exception e) { + response.close(); + throw new IOException(org.apache.hadoop.util.StringUtils.stringifyException(e)); + } + return segmentDescriptors; + } + private static String deserializeSerialize(String druidQuery) throws JsonParseException, JsonMappingException, IOException { BaseQuery<?> deserializedQuery = DruidStorageHandlerUtils.JSON_MAPPER.readValue( http://git-wip-us.apache.org/repos/asf/hive/blob/cf4114e1/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java new file mode 100644 index 0000000..cbeac2c --- /dev/null +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidScanQueryRecordReader.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.druid.serde; + +import io.druid.query.Result; +import io.druid.query.scan.ScanQuery; +import io.druid.query.scan.ScanResultValue; + +import org.apache.hadoop.hive.druid.DruidStorageHandlerUtils; +import org.apache.hadoop.io.NullWritable; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; +import com.google.common.collect.Iterators; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +/** + * Record reader for results for Druid ScanQuery. + */ +public class DruidScanQueryRecordReader + extends DruidQueryRecordReader<ScanQuery, ScanResultValue> { + + private static final TypeReference<ScanResultValue> TYPE_REFERENCE = + new TypeReference<ScanResultValue>() { + }; + + private ScanResultValue current; + + private Iterator<List<Object>> compactedValues = Iterators.emptyIterator(); + + @Override + protected JavaType getResultTypeDef() { + return DruidStorageHandlerUtils.JSON_MAPPER.getTypeFactory().constructType(TYPE_REFERENCE); + } + + @Override + public boolean nextKeyValue() throws IOException { + if (compactedValues.hasNext()) { + return true; + } + if (queryResultsIterator.hasNext()) { + current = queryResultsIterator.next(); + compactedValues = ((List<List<Object>>) current.getEvents()).iterator(); + return nextKeyValue(); + } + return false; + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return NullWritable.get(); + } + + @Override + public DruidWritable getCurrentValue() throws IOException, InterruptedException { + // Create new value + DruidWritable value = new DruidWritable(); + List<Object> e = compactedValues.next(); + for (int i = 0; i < current.getColumns().size(); i++) { + value.getValue().put(current.getColumns().get(i), e.get(i)); + } + return value; + } + + @Override + public boolean next(NullWritable key, DruidWritable value) throws IOException { + if (nextKeyValue()) { + // Update value + value.getValue().clear(); + List<Object> e = compactedValues.next(); + for (int i = 0; i < current.getColumns().size(); i++) { + value.getValue().put(current.getColumns().get(i), e.get(i)); + } + return true; + } + return false; + } + + @Override + public float getProgress() { + return queryResultsIterator.hasNext() || compactedValues.hasNext() ? 0 : 1; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/cf4114e1/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java ---------------------------------------------------------------------- diff --git a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java index 3899bff..3696b0f 100644 --- a/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java +++ b/druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSerDe.java @@ -98,6 +98,7 @@ import io.druid.query.groupby.GroupByQuery; import io.druid.query.metadata.metadata.ColumnAnalysis; import io.druid.query.metadata.metadata.SegmentAnalysis; import io.druid.query.metadata.metadata.SegmentMetadataQuery; +import io.druid.query.scan.ScanQuery; import io.druid.query.select.SelectQuery; import io.druid.query.timeseries.TimeseriesQuery; import io.druid.query.topn.TopNQuery; @@ -245,6 +246,15 @@ public class DruidSerDe extends AbstractSerDe { inferSchema((GroupByQuery) query, tsTZTypeInfo, columnNames, columnTypes, mapColumnNamesTypes.build()); break; + case Query.SCAN: + String broker = HiveConf.getVar(configuration, + HiveConf.ConfVars.HIVE_DRUID_BROKER_DEFAULT_ADDRESS); + if (org.apache.commons.lang3.StringUtils.isEmpty(broker)) { + throw new SerDeException("Druid broker address not specified in configuration"); + } + inferSchema((ScanQuery) query, tsTZTypeInfo, columnNames, columnTypes, broker, + mapColumnNamesTypes.build()); + break; default: throw new SerDeException("Not supported Druid query"); } @@ -406,6 +416,43 @@ public class DruidSerDe extends AbstractSerDe { } } + /* Scan query */ + private void inferSchema(ScanQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo, + List<String> columnNames, List<PrimitiveTypeInfo> columnTypes, + String address, Map<String, PrimitiveTypeInfo> mapColumnNamesTypes) + throws SerDeException { + // The type for metric columns is not explicit in the query, thus in this case + // we need to emit a metadata query to know their type + SegmentMetadataQueryBuilder builder = new Druids.SegmentMetadataQueryBuilder(); + builder.dataSource(query.getDataSource()); + builder.merge(true); + builder.analysisTypes(); + SegmentMetadataQuery metadataQuery = builder.build(); + // Execute query in Druid + SegmentAnalysis schemaInfo; + try { + schemaInfo = submitMetadataRequest(address, metadataQuery); + } catch (IOException e) { + throw new SerDeException(e); + } + if (schemaInfo == null) { + throw new SerDeException("Connected to Druid but could not retrieve datasource information"); + } + for (String column : query.getColumns()) { + columnNames.add(column); + PrimitiveTypeInfo typeInfo = mapColumnNamesTypes.get(column); + if (typeInfo != null) { + // If datasource was created by Hive, we consider Hive type + columnTypes.add(typeInfo); + } else { + ColumnAnalysis columnAnalysis = schemaInfo.getColumns().get(column); + // If column is absent from Druid consider it as a dimension with type string. + String type = columnAnalysis == null ? DruidSerDeUtils.STRING_TYPE : columnAnalysis.getType(); + columnTypes.add(DruidSerDeUtils.convertDruidToHiveType(type)); + } + } + } + /* GroupBy query */ private void inferSchema(GroupByQuery query, TimestampLocalTZTypeInfo timeColumnTypeInfo, List<String> columnNames, List<PrimitiveTypeInfo> columnTypes, @@ -543,7 +590,7 @@ public class DruidSerDe extends AbstractSerDe { new TimestampLocalTZWritable( new TimestampTZ( ZonedDateTime.ofInstant( - Instant.ofEpochMilli((Long) value), + Instant.ofEpochMilli(((Number) value).longValue()), ((TimestampLocalTZTypeInfo) types[i]).timeZone())))); break; case BYTE: