I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose datatype is 'VARBINARY'. The data in these columns is compressed float[] in form of ByteBuffer called DenseVector which is an ordered set of 16 bit IEEE floats of cardinality no more than 3996. I have loaded data into phoenix tables through spark-phoenix plugin. Just to give an idea the mapreduce jobs write data in hive in parquet gzip format. I read data into a dataframe using sqlContext.parquetFile() , register it as temp table and run a sqlContext.sql("select query ...") query and finally calling res.save("org.apache.phoenix.spark", SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" -> "localhost:2181")) We have a hive/shark UDF(code pasted below) that can decode these ByteBuffer columns and display them in readable float[]. So this UDF works on spark-shell. Now I just want to write a similar UDF in phoenix and run queries as " select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from siteflowtable" and further write UDAFs over it. How do I make phoenix UDF return float[] ?? I have tried a lot many things but none seem to work.
Below is the code for hive/shark UDF- ------------------------------------------------------------ ------------------------------ package com.ABCD.densevectorudf; import java.nio.ByteBuffer; import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive. BinaryObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive. PrimitiveObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive. StringObjectInspector; import org.apache.hadoop.io.FloatWritable; import com.ABCD.common.attval.IDenseVectorOperator; import com.ABCD.common.attval.Utility; import com.ABCD.common.attval.array.BufferOperations; import com.ABCD.common.attval.array.FloatArrayFactory; @Description(name = "DenseVectorUDF", value = "Dense Vector UDF in Hive / Shark\n" + "_FUNC_(binary|hex) - " + "Returns the dense vector array<float> value\n", extended = "Dense Vector UDAF in Hive / Shark") public class DenseVectorUDF extends GenericUDF { private static final Log LOG = LogFactory.getLog( DenseVectorUDF.class.getName()); private ObjectInspector inputOI; private ListObjectInspector outputOI; @Override public String getDisplayString(String[] children) { StringBuilder sb = new StringBuilder(); sb.append("densevectorudf("); for (int i = 0; i < children.length; i++) { sb.append(children[i]); if (i + 1 != children.length) { sb.append(","); } } sb.append(")"); return sb.toString(); } @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length == 1) { ObjectInspector first = arguments[0]; if (!(first instanceof StringObjectInspector) && !(first instanceof BinaryObjectInspector)) { LOG.error("first argument must be a either binary or hex buffer"); throw new UDFArgumentException("first argument must be a either binary or hex buffer"); } inputOI = first; outputOI = ObjectInspectorFactory.getStandardListObjectInspector( PrimitiveObjectInspectorFactory.writableFloatObjectInspector); } else { throw new UDFArgumentLengthException("Wrong argument length is passed. Arguments length is NOT supported."); } return outputOI; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); Object object = arguments[0].get(); Vector<Float> floatVector = null; ByteBuffer buff = null; if (inputOI instanceof StringObjectInspector) { String hex = ((StringObjectInspector) inputOI). getPrimitiveJavaObject(object); buff = ByteBuffer.wrap(Utility.hexToBytes(hex)); } else if (inputOI instanceof BinaryObjectInspector) { byte[] bytes = ((BinaryObjectInspector) inputOI). getPrimitiveJavaObject(object); buff = ByteBuffer.wrap(bytes); } floatVector = idv.getElements(buff); Object red [] = new Object[floatVector.size()]; for(int index = 0; index < red.length; index++){ red[index] = new FloatWritable(floatVector.get(index)); } LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff)); return red; } } ------------------------------------------------------------ ------------------------------ Following is the code I have written for Phoenix UDF- ------------------------------------------------------------ ------------------------------ package org.apache.phoenix.expression.function; import com.ABCD.common.attval.IDenseVectorOperator; import com.ABCD.common.attval.array.BufferOperations; import com.ABCD.common.attval.array.FloatArrayFactory; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.primitive. PrimitiveObjectInspectorFactory; import org.apache.hadoop.io.FloatWritable; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectInspector; import org.apache.phoenix.parse.FunctionParseNode.Argument; import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.schema.types.PFloatArray; import org.apache.phoenix.schema.types.PVarbinary; import java.nio.ByteBuffer; import java.nio.FloatBuffer; import java.sql.SQLException; import java.util.List; import java.util.Vector; @BuiltInFunction(name = DenseVectorFunction.NAME, args = { @Argument(allowedTypes = {PVarbinary.class})}) public class DenseVectorFunction extends ScalarFunction { public static final String NAME = "DenseVectorFunction"; private ListObjectInspector outputOI; public DenseVectorFunction() { } public DenseVectorFunction(List<Expression> children) throws SQLException { super(children); } @Override public String getName() { return NAME; } public Expression getElementExpr() { return children.get(0); } @Override public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { if (!getElementExpr().evaluate(tuple, ptr)) { return false; } Object element = getElementExpr().getDataType().toObject(ptr, getElementExpr().getSortOrder(), getElementExpr().getMaxLength(), getElementExpr().getScale()); IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); PhoenixBinaryObjectInspector pboi = new PhoenixBinaryObjectInspector(); byte[] bytes = pboi.getPrimitiveJavaObject(element); Object object = ptr.get(); Vector<Float> floatVector = null; ByteBuffer buff = null; buff = ByteBuffer.wrap(bytes); floatVector = idv.getElements(buff); Object[] red = new Object[floatVector.size()]; for (int index = 0; index < red.length; index++) { red[index] = new FloatWritable(floatVector.get(index)); System.out.println("" + floatVector.get(index)); } System.out.println("Buffer header = " + BufferOperations.stringifyBuffer(buff)); // This prints header info in ByteBuffer which is correct //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY ptr.set(??); return true; } @Override public SortOrder getSortOrder() { return children.get(0).getSortOrder(); } @Override public PDataType getDataType() { return PFloatArray.INSTANCE; } } ------------------------------------------------------------ ------------------------------ Any help will be much appreciated.