Hi Akhil, There is no support for UDAFs in Phoenix at present.
Thanks, Rajeshbabu. On Sun, Oct 2, 2016 at 6:57 PM, akhil jain <akhilcancer...@gmail.com> wrote: > Thanks James. It worked. > > Can you please provide me pointers to write UDAFs in phoenix like we > have GenericUDAFEvaluator for writing Hive UDAFs. > I am looking for a tutorial like http://beekeeperdata.com/ > posts/hadoop/2015/08/17/hive-udaf-tutorial.html for phoenix. > > Thanks, > Akhil > > On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <jamestay...@apache.org> > wrote: > >> Hi Akhil, >> You want to create an Array, convert it to its byte[] representation, and >> set the ptr argument to point to it. Take a look at ArrayIT for examples of >> creating an Array: >> >> // Create Array of FLOAT >> Float[] floatArr = new Float[2]; >> floatArr[0] = 64.87; >> floatArr[1] = 89.96; >> Array array = conn.createArrayOf("FLOAT", floatArr); >> // Convert to byte[] >> byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array); >> // Set ptr to byte[] >> ptr.set(arrayAsBytes); >> >> Thanks, >> James >> >> >> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <akhilcancer...@gmail.com> >> wrote: >> >>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose >>> datatype is 'VARBINARY'. >>> The data in these columns is compressed float[] in form of ByteBuffer >>> called DenseVector which is an ordered set of 16 bit IEEE floats of >>> cardinality no more than 3996. >>> I have loaded data into phoenix tables through spark-phoenix plugin. >>> Just to give an idea the mapreduce jobs write data in hive in parquet gzip >>> format. I read data into a dataframe using sqlContext.parquetFile() , >>> register it as temp table and run a sqlContext.sql("select query ...") >>> query and finally calling res.save("org.apache.phoenix.spark", >>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" -> >>> "localhost:2181")) >>> We have a hive/shark UDF(code pasted below) that can decode these >>> ByteBuffer columns and display them in readable float[]. So this UDF works >>> on spark-shell. >>> Now I just want to write a similar UDF in phoenix and run queries as " >>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from >>> siteflowtable" and further write UDAFs over it. >>> How do I make phoenix UDF return float[] ?? I have tried a lot many >>> things but none seem to work. >>> >>> Below is the code for hive/shark UDF- >>> ------------------------------------------------------------ >>> ------------------------------ >>> package com.ABCD.densevectorudf; >>> >>> import java.nio.ByteBuffer; >>> import java.util.Vector; >>> >>> import org.apache.commons.logging.Log; >>> import org.apache.commons.logging.LogFactory; >>> import org.apache.hadoop.hive.ql.exec.Description; >>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException; >>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; >>> import org.apache.hadoop.hive.ql.metadata.HiveException; >>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; >>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp >>> ector; >>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; >>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto >>> rFactory; >>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina >>> ryObjectInspector; >>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim >>> itiveObjectInspectorFactory; >>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri >>> ngObjectInspector; >>> import org.apache.hadoop.io.FloatWritable; >>> >>> import com.ABCD.common.attval.IDenseVectorOperator; >>> import com.ABCD.common.attval.Utility; >>> import com.ABCD.common.attval.array.BufferOperations; >>> import com.ABCD.common.attval.array.FloatArrayFactory; >>> >>> @Description(name = "DenseVectorUDF", >>> value = "Dense Vector UDF in Hive / Shark\n" >>> + "_FUNC_(binary|hex) - " >>> + "Returns the dense vector array<float> value\n", >>> extended = "Dense Vector UDAF in Hive / Shark") >>> >>> public class DenseVectorUDF extends GenericUDF { >>> private static final Log LOG = LogFactory.getLog(DenseVectorU >>> DF.class.getName()); >>> private ObjectInspector inputOI; >>> private ListObjectInspector outputOI; >>> >>> @Override >>> public String getDisplayString(String[] children) { >>> StringBuilder sb = new StringBuilder(); >>> sb.append("densevectorudf("); >>> for (int i = 0; i < children.length; i++) { >>> sb.append(children[i]); >>> if (i + 1 != children.length) { >>> sb.append(","); >>> } >>> } >>> sb.append(")"); >>> return sb.toString(); >>> } >>> >>> @Override >>> public ObjectInspector initialize(ObjectInspector[] arguments) throws >>> UDFArgumentException { >>> if (arguments.length == 1) { >>> ObjectInspector first = arguments[0]; >>> if (!(first instanceof StringObjectInspector) && !(first instanceof >>> BinaryObjectInspector)) { >>> LOG.error("first argument must be a either binary or hex buffer"); >>> throw new UDFArgumentException("first argument must be a either binary >>> or hex buffer"); >>> } >>> inputOI = first; >>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit >>> iveObjectInspectorFactory.writableFloatObjectInspector); >>> } else { >>> throw new UDFArgumentLengthException("Wrong argument length is passed. >>> Arguments length is NOT supported."); >>> } >>> return outputOI; >>> } >>> >>> @Override >>> public Object evaluate(DeferredObject[] arguments) throws HiveException { >>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); >>> Object object = arguments[0].get(); >>> Vector<Float> floatVector = null; >>> ByteBuffer buff = null; >>> if (inputOI instanceof StringObjectInspector) { >>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec >>> t(object); >>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex)); >>> } else if (inputOI instanceof BinaryObjectInspector) { >>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec >>> t(object); >>> buff = ByteBuffer.wrap(bytes); >>> } >>> floatVector = idv.getElements(buff); >>> Object red [] = new Object[floatVector.size()]; >>> for(int index = 0; index < red.length; index++){ >>> red[index] = new FloatWritable(floatVector.get(index)); >>> } >>> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff)); >>> return red; >>> } >>> } >>> >>> ------------------------------------------------------------ >>> ------------------------------ >>> >>> >>> Following is the code I have written for Phoenix UDF- >>> ------------------------------------------------------------ >>> ------------------------------ >>> package org.apache.phoenix.expression.function; >>> >>> import com.ABCD.common.attval.IDenseVectorOperator; >>> import com.ABCD.common.attval.array.BufferOperations; >>> import com.ABCD.common.attval.array.FloatArrayFactory; >>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable; >>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp >>> ector; >>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto >>> rFactory; >>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim >>> itiveObjectInspectorFactory; >>> import org.apache.hadoop.io.FloatWritable; >>> import org.apache.phoenix.expression.Expression; >>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI >>> nspector; >>> import org.apache.phoenix.parse.FunctionParseNode.Argument; >>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; >>> import org.apache.phoenix.schema.SortOrder; >>> import org.apache.phoenix.schema.tuple.Tuple; >>> import org.apache.phoenix.schema.types.PDataType; >>> import org.apache.phoenix.schema.types.PFloatArray; >>> import org.apache.phoenix.schema.types.PVarbinary; >>> >>> import java.nio.ByteBuffer; >>> import java.nio.FloatBuffer; >>> import java.sql.SQLException; >>> import java.util.List; >>> import java.util.Vector; >>> >>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = { >>> @Argument(allowedTypes = {PVarbinary.class})}) >>> public class DenseVectorFunction extends ScalarFunction { >>> public static final String NAME = "DenseVectorFunction"; >>> private ListObjectInspector outputOI; >>> >>> public DenseVectorFunction() { >>> } >>> >>> public DenseVectorFunction(List<Expression> children) throws >>> SQLException { >>> super(children); >>> } >>> >>> @Override >>> public String getName() { >>> return NAME; >>> } >>> >>> public Expression getElementExpr() { >>> return children.get(0); >>> } >>> >>> @Override >>> public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { >>> if (!getElementExpr().evaluate(tuple, ptr)) { >>> return false; >>> } >>> Object element = getElementExpr().getDataType().toObject(ptr, >>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(), >>> getElementExpr().getScale()); >>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); >>> PhoenixBinaryObjectInspector pboi = new >>> PhoenixBinaryObjectInspector(); >>> byte[] bytes = pboi.getPrimitiveJavaObject(element); >>> Object object = ptr.get(); >>> Vector<Float> floatVector = null; >>> ByteBuffer buff = null; >>> buff = ByteBuffer.wrap(bytes); >>> floatVector = idv.getElements(buff); >>> >>> Object[] red = new Object[floatVector.size()]; >>> for (int index = 0; index < red.length; index++) { >>> red[index] = new FloatWritable(floatVector.get(index)); >>> System.out.println("" + floatVector.get(index)); >>> } >>> System.out.println("Buffer header = " + >>> BufferOperations.stringifyBuffer(buff)); // This prints header info in >>> ByteBuffer which is correct >>> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY >>> ptr.set(??); >>> return true; >>> } >>> >>> @Override >>> public SortOrder getSortOrder() { >>> return children.get(0).getSortOrder(); >>> } >>> >>> @Override >>> public PDataType getDataType() { >>> return PFloatArray.INSTANCE; >>> } >>> } >>> ------------------------------------------------------------ >>> ------------------------------ >>> >>> Any help will be much appreciated. >>> >> >> >