Hi Akhil, You want to create an Array, convert it to its byte[] representation, and set the ptr argument to point to it. Take a look at ArrayIT for examples of creating an Array:
// Create Array of FLOAT Float[] floatArr = new Float[2]; floatArr[0] = 64.87; floatArr[1] = 89.96; Array array = conn.createArrayOf("FLOAT", floatArr); // Convert to byte[] byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array); // Set ptr to byte[] ptr.set(arrayAsBytes); Thanks, James On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <akhilcancer...@gmail.com> wrote: > I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose > datatype is 'VARBINARY'. > The data in these columns is compressed float[] in form of ByteBuffer > called DenseVector which is an ordered set of 16 bit IEEE floats of > cardinality no more than 3996. > I have loaded data into phoenix tables through spark-phoenix plugin. Just > to give an idea the mapreduce jobs write data in hive in parquet gzip > format. I read data into a dataframe using sqlContext.parquetFile() , > register it as temp table and run a sqlContext.sql("select query ...") > query and finally calling res.save("org.apache.phoenix.spark", > SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" -> > "localhost:2181")) > We have a hive/shark UDF(code pasted below) that can decode these > ByteBuffer columns and display them in readable float[]. So this UDF works > on spark-shell. > Now I just want to write a similar UDF in phoenix and run queries as " > select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from > siteflowtable" and further write UDAFs over it. > How do I make phoenix UDF return float[] ?? I have tried a lot many things > but none seem to work. > > Below is the code for hive/shark UDF- > ------------------------------------------------------------ > ------------------------------ > package com.ABCD.densevectorudf; > > import java.nio.ByteBuffer; > import java.util.Vector; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.hive.ql.exec.Description; > import org.apache.hadoop.hive.ql.exec.UDFArgumentException; > import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; > import org.apache.hadoop.hive.ql.metadata.HiveException; > import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; > import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto > rFactory; > import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina > ryObjectInspector; > import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim > itiveObjectInspectorFactory; > import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri > ngObjectInspector; > import org.apache.hadoop.io.FloatWritable; > > import com.ABCD.common.attval.IDenseVectorOperator; > import com.ABCD.common.attval.Utility; > import com.ABCD.common.attval.array.BufferOperations; > import com.ABCD.common.attval.array.FloatArrayFactory; > > @Description(name = "DenseVectorUDF", > value = "Dense Vector UDF in Hive / Shark\n" > + "_FUNC_(binary|hex) - " > + "Returns the dense vector array<float> value\n", > extended = "Dense Vector UDAF in Hive / Shark") > > public class DenseVectorUDF extends GenericUDF { > private static final Log LOG = LogFactory.getLog(DenseVectorU > DF.class.getName()); > private ObjectInspector inputOI; > private ListObjectInspector outputOI; > > @Override > public String getDisplayString(String[] children) { > StringBuilder sb = new StringBuilder(); > sb.append("densevectorudf("); > for (int i = 0; i < children.length; i++) { > sb.append(children[i]); > if (i + 1 != children.length) { > sb.append(","); > } > } > sb.append(")"); > return sb.toString(); > } > > @Override > public ObjectInspector initialize(ObjectInspector[] arguments) throws > UDFArgumentException { > if (arguments.length == 1) { > ObjectInspector first = arguments[0]; > if (!(first instanceof StringObjectInspector) && !(first instanceof > BinaryObjectInspector)) { > LOG.error("first argument must be a either binary or hex buffer"); > throw new UDFArgumentException("first argument must be a either binary or > hex buffer"); > } > inputOI = first; > outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit > iveObjectInspectorFactory.writableFloatObjectInspector); > } else { > throw new UDFArgumentLengthException("Wrong argument length is passed. > Arguments length is NOT supported."); > } > return outputOI; > } > > @Override > public Object evaluate(DeferredObject[] arguments) throws HiveException { > IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); > Object object = arguments[0].get(); > Vector<Float> floatVector = null; > ByteBuffer buff = null; > if (inputOI instanceof StringObjectInspector) { > String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec > t(object); > buff = ByteBuffer.wrap(Utility.hexToBytes(hex)); > } else if (inputOI instanceof BinaryObjectInspector) { > byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec > t(object); > buff = ByteBuffer.wrap(bytes); > } > floatVector = idv.getElements(buff); > Object red [] = new Object[floatVector.size()]; > for(int index = 0; index < red.length; index++){ > red[index] = new FloatWritable(floatVector.get(index)); > } > LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff)); > return red; > } > } > > ------------------------------------------------------------ > ------------------------------ > > > Following is the code I have written for Phoenix UDF- > ------------------------------------------------------------ > ------------------------------ > package org.apache.phoenix.expression.function; > > import com.ABCD.common.attval.IDenseVectorOperator; > import com.ABCD.common.attval.array.BufferOperations; > import com.ABCD.common.attval.array.FloatArrayFactory; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; > import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto > rFactory; > import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim > itiveObjectInspectorFactory; > import org.apache.hadoop.io.FloatWritable; > import org.apache.phoenix.expression.Expression; > import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI > nspector; > import org.apache.phoenix.parse.FunctionParseNode.Argument; > import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; > import org.apache.phoenix.schema.SortOrder; > import org.apache.phoenix.schema.tuple.Tuple; > import org.apache.phoenix.schema.types.PDataType; > import org.apache.phoenix.schema.types.PFloatArray; > import org.apache.phoenix.schema.types.PVarbinary; > > import java.nio.ByteBuffer; > import java.nio.FloatBuffer; > import java.sql.SQLException; > import java.util.List; > import java.util.Vector; > > @BuiltInFunction(name = DenseVectorFunction.NAME, args = { > @Argument(allowedTypes = {PVarbinary.class})}) > public class DenseVectorFunction extends ScalarFunction { > public static final String NAME = "DenseVectorFunction"; > private ListObjectInspector outputOI; > > public DenseVectorFunction() { > } > > public DenseVectorFunction(List<Expression> children) throws > SQLException { > super(children); > } > > @Override > public String getName() { > return NAME; > } > > public Expression getElementExpr() { > return children.get(0); > } > > @Override > public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { > if (!getElementExpr().evaluate(tuple, ptr)) { > return false; > } > Object element = getElementExpr().getDataType().toObject(ptr, > getElementExpr().getSortOrder(), getElementExpr().getMaxLength(), > getElementExpr().getScale()); > IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); > PhoenixBinaryObjectInspector pboi = new > PhoenixBinaryObjectInspector(); > byte[] bytes = pboi.getPrimitiveJavaObject(element); > Object object = ptr.get(); > Vector<Float> floatVector = null; > ByteBuffer buff = null; > buff = ByteBuffer.wrap(bytes); > floatVector = idv.getElements(buff); > > Object[] red = new Object[floatVector.size()]; > for (int index = 0; index < red.length; index++) { > red[index] = new FloatWritable(floatVector.get(index)); > System.out.println("" + floatVector.get(index)); > } > System.out.println("Buffer header = " + > BufferOperations.stringifyBuffer(buff)); // This prints header info in > ByteBuffer which is correct > //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY > ptr.set(??); > return true; > } > > @Override > public SortOrder getSortOrder() { > return children.get(0).getSortOrder(); > } > > @Override > public PDataType getDataType() { > return PFloatArray.INSTANCE; > } > } > ------------------------------------------------------------ > ------------------------------ > > Any help will be much appreciated. >