This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new b159b95 [CARBONDATA-3542] Support Map data type reading through Hive b159b95 is described below commit b159b9541d6380de8b8f5adc37454ca0f775c485 Author: dhatchayani <dhatcha.offic...@gmail.com> AuthorDate: Sat Oct 5 22:28:15 2019 +0530 [CARBONDATA-3542] Support Map data type reading through Hive Problem: Map data type is not supported while reading through hive Solution: Handle to support MAP data type. This closes #3407 --- docs/hive-guide.md | 1 - .../apache/carbondata/examples/HiveExample.scala | 38 +++++ .../hive/CarbonDictionaryDecodeReadSupport.java | 46 ++++++ .../apache/carbondata/hive/CarbonMapInspector.java | 179 +++++++++++++++++++++ .../carbondata/hive/CarbonObjectInspector.java | 6 + 5 files changed, 269 insertions(+), 1 deletion(-) diff --git a/docs/hive-guide.md b/docs/hive-guide.md index e839b9b..7aba3bf 100644 --- a/docs/hive-guide.md +++ b/docs/hive-guide.md @@ -116,6 +116,5 @@ select * from hive_carbon_1 order by id; ### Note - Partition table support is not handled - - Map data type is not supported diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala index 6639e8e..241ee8c 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HiveExample.scala @@ -94,6 +94,15 @@ object HiveExample { "'QUOTECHAR'='\"', 'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='c1_int,c2_Bigint," + "c3_Decimal,c4_double,c5_string,c6_Timestamp,c7_Datatype_Desc')") + carbonSession.sql("""DROP TABLE IF EXISTS complexMap""".stripMargin) + + carbonSession.sql("create table complexMap(name map<string,string>) stored by 'carbondata'") + + carbonSession + .sql( + "insert into complexMap values(map('Manish','Nalla','Shardul','Singh','Vishal','Kumar'," + + "'EmptyVal','','NullVal', 'null'))") + carbonSession.close() // delete the already existing lock on metastore so that new derby instance @@ -266,6 +275,35 @@ object HiveExample { s"$resultAggQueryFetched") assert(resultAggQueryFetched == 1) + val resultComplexQuery = statement + .executeQuery( + "SELECT name FROM complexMap") + + var resultComplex = 0 + + var name = "" + + while (resultComplexQuery.next) { + if (resultComplex == 0) { + println("+------------------------------------------------------------------------------" + + "------+") + println("| name " + + " |") + + println("+-------------------------------------------------------------------------------" + + "-----+") + + name = resultComplexQuery.getString("name") + + println(s"|$name|") + println("+-------------------------------------------------------------------------------" + + "-----+") } + resultComplex = resultComplex + 1 + } + println(" ********** Total Rows Fetched When Complex Query **********" + + s"$resultComplex") + assert(resultComplex == 1) + hiveEmbeddedServer2.stop() } } diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java index 52ece32..1bb7088 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonDictionaryDecodeReadSupport.java @@ -143,6 +143,8 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T return createStruct(obj, carbonColumn); } else if (DataTypes.isArrayType(dataType)) { return createArray(obj, carbonColumn); + } else if (DataTypes.isMapType(dataType)) { + return createMap(obj, carbonColumn); } else { return createWritablePrimitive(obj, carbonColumn); } @@ -207,6 +209,48 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T } /** + * Create the Map data for Map Datatype + * + * @param obj + * @param carbonColumn + * @return + * @throws IOException + */ + private ArrayWritable createMap(Object obj, CarbonColumn carbonColumn) throws IOException { + Object[] objArray = (Object[]) obj; + List<CarbonDimension> childCarbonDimensions = null; + CarbonDimension mapDimension = null; + List<ArrayWritable> writablesList = new ArrayList<>(); + if (carbonColumn.isDimension() && carbonColumn.getColumnSchema().getNumberOfChild() > 0) { + childCarbonDimensions = ((CarbonDimension) carbonColumn).getListOfChildDimensions(); + // get the map dimension wrapped inside the carbon dimension + mapDimension = childCarbonDimensions.get(0); + // get the child dimenesions of the map dimensions, child dimensions are - Key and Value + if (null != mapDimension) { + childCarbonDimensions = mapDimension.getListOfChildDimensions(); + } + } + if (null != childCarbonDimensions && childCarbonDimensions.size() == 2) { + Object[] keyObjects = (Object[]) objArray[0]; + Object[] valObjects = (Object[]) objArray[1]; + for (int i = 0; i < keyObjects.length; i++) { + Writable keyWritable = createWritableObject(keyObjects[i], childCarbonDimensions.get(0)); + Writable valWritable = createWritableObject(valObjects[i], childCarbonDimensions.get(1)); + Writable[] arr = new Writable[2]; + arr[0] = keyWritable; + arr[1] = valWritable; + writablesList.add(new ArrayWritable(Writable.class, arr)); + } + if (writablesList.size() > 0) { + final ArrayWritable subArray = new ArrayWritable(ArrayWritable.class, + writablesList.toArray(new ArrayWritable[writablesList.size()])); + return new ArrayWritable(Writable.class, new Writable[] {subArray}); + } + } + return null; + } + + /** * This method will create the Writable Objects for primitives. * * @param obj @@ -256,6 +300,8 @@ public class CarbonDictionaryDecodeReadSupport<T> implements CarbonReadSupport<T return createArray(obj, carbonColumn); } else if (DataTypes.isStructType(dataType)) { return createStruct(obj, carbonColumn); + } else if (DataTypes.isMapType(dataType)) { + return createMap(obj, carbonColumn); } else if (DataTypes.isDecimal(dataType)) { return new HiveDecimalWritable(HiveDecimal.create(new java.math.BigDecimal(obj.toString()))); } else { diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonMapInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonMapInspector.java new file mode 100644 index 0000000..b17ac43 --- /dev/null +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonMapInspector.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.hive; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; + +public class CarbonMapInspector implements SettableMapObjectInspector { + + protected final ObjectInspector keyInspector; + protected final ObjectInspector valueInspector; + + public CarbonMapInspector(final ObjectInspector keyInspector, + final ObjectInspector valueInspector) { + this.keyInspector = keyInspector; + this.valueInspector = valueInspector; + } + + @Override public String getTypeName() { + return "map<" + keyInspector.getTypeName() + "," + valueInspector.getTypeName() + ">"; + } + + @Override public Category getCategory() { + return Category.MAP; + } + + @Override + public ObjectInspector getMapKeyObjectInspector() { + return keyInspector; + } + + @Override + public ObjectInspector getMapValueObjectInspector() { + return valueInspector; + } + + @Override public Object getMapValueElement(Object data, Object key) { + if (data != null && key != null) { + Map<?, ?> map = (Map)data; + return map.get(key); + } else { + return null; + } + } + + @Override + public Map<?, ?> getMap(final Object data) { + if (data == null) { + return null; + } + if (data instanceof ArrayWritable) { + final Writable[] mapArray = ((ArrayWritable) data).get(); + if (mapArray == null) { + return null; + } + + final Map<Writable, Writable> map = new LinkedHashMap<>(); + for (final Writable obj : mapArray) { + final ArrayWritable mapObj = (ArrayWritable) obj; + final Writable[] arr = mapObj.get(); + for (int i = 0; i < arr.length; i++) { + map.put(((ArrayWritable) arr[i]).get()[0], ((ArrayWritable) arr[i]).get()[1]); + } + } + return map; + } + if (data instanceof Map) { + return (Map) data; + } + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public int getMapSize(final Object data) { + if (data == null) { + return -1; + } + if (data instanceof ArrayWritable) { + final Writable[] mapArray = ((ArrayWritable) data).get(); + + if (mapArray == null) { + return -1; + } else { + return mapArray.length; + } + } + if (data instanceof Map) { + return ((Map) data).size(); + } + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public Object create() { + Map<Object, Object> m = new LinkedHashMap<>(); + return m; + } + + @Override + public Object put(Object map, Object key, Object value) { + Map<Object, Object> m = (Map<Object, Object>) map; + m.put(key, value); + return m; + } + + @Override + public Object remove(Object map, Object key) { + Map<Object, Object> m = (Map<Object, Object>) map; + m.remove(key); + return m; + } + + @Override + public Object clear(Object map) { + Map<Object, Object> m = (Map<Object, Object>) map; + m.clear(); + return m; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((keyInspector == null) ? 0 : keyInspector.hashCode()); + result = prime * result + + ((valueInspector == null) ? 0 : valueInspector.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final CarbonMapInspector other = (CarbonMapInspector) obj; + if (keyInspector == null) { + if (other.keyInspector != null) { + return false; + } + } else if (!keyInspector.equals(other.keyInspector)) { + return false; + } + if (valueInspector == null) { + if (other.valueInspector != null) { + return false; + } + } else if (!valueInspector.equals(other.valueInspector)) { + return false; + } + return true; + } +} diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java index 75c7056..9718405 100644 --- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java +++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectIn import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableHiveVarcharObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -88,6 +89,11 @@ class CarbonObjectInspector extends SettableStructObjectInspector { return new WritableHiveVarcharObjectInspector((VarcharTypeInfo) typeInfo); } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) { return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + ObjectInspector mapKeyObjectIns = getObjectInspector(mapTypeInfo.getMapKeyTypeInfo()); + ObjectInspector mapValObjectIns = getObjectInspector(mapTypeInfo.getMapValueTypeInfo()); + return new CarbonMapInspector(mapKeyObjectIns, mapValObjectIns); } else { throw new UnsupportedOperationException("Unknown field type: " + typeInfo); }