Hi Akhil,

There is no support for UDAFs in Phoenix at present.

Thanks,
Rajeshbabu.

On Sun, Oct 2, 2016 at 6:57 PM, akhil jain <akhilcancer...@gmail.com> wrote:

> Thanks James. It worked.
>
> Can you please provide me pointers to write UDAFs in phoenix like we
> have GenericUDAFEvaluator for writing Hive UDAFs.
> I am looking for a tutorial like http://beekeeperdata.com/
> posts/hadoop/2015/08/17/hive-udaf-tutorial.html for phoenix.
>
> Thanks,
> Akhil
>
> On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <jamestay...@apache.org>
> wrote:
>
>> Hi Akhil,
>> You want to create an Array, convert it to its byte[] representation, and
>> set the ptr argument to point to it. Take a look at ArrayIT for examples of
>> creating an Array:
>>
>>     // Create Array of FLOAT
>>     Float[] floatArr =  new Float[2];
>>     floatArr[0] = 64.87;
>>     floatArr[1] = 89.96;
>>     Array array = conn.createArrayOf("FLOAT", floatArr);
>>     // Convert to byte[]
>>     byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
>>     // Set ptr to byte[]
>>     ptr.set(arrayAsBytes);
>>
>> Thanks,
>> James
>>
>>
>> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <akhilcancer...@gmail.com>
>> wrote:
>>
>>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns whose
>>> datatype is 'VARBINARY'.
>>> The data in these columns is compressed float[] in form of ByteBuffer
>>> called DenseVector which is an ordered set of 16 bit IEEE floats of
>>> cardinality no more than 3996.
>>> I have loaded data into phoenix tables through spark-phoenix plugin.
>>> Just to give an idea the mapreduce jobs write data in hive in parquet gzip
>>> format. I read data into a dataframe using sqlContext.parquetFile() ,
>>> register it as temp table and run a sqlContext.sql("select query ...")
>>> query and finally calling res.save("org.apache.phoenix.spark",
>>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
>>> "localhost:2181"))
>>> We have a hive/shark UDF(code pasted below) that can decode these
>>> ByteBuffer columns and display them in readable float[]. So this UDF works
>>> on spark-shell.
>>> Now I just want to write a similar UDF in phoenix and run queries as "
>>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
>>> siteflowtable" and further write UDAFs over it.
>>> How do I make phoenix UDF return float[] ?? I have tried a lot many
>>> things but none seem to work.
>>>
>>> Below is the code for hive/shark UDF-
>>> ------------------------------------------------------------
>>> ------------------------------
>>> package com.ABCD.densevectorudf;
>>>
>>> import java.nio.ByteBuffer;
>>> import java.util.Vector;
>>>
>>> import org.apache.commons.logging.Log;
>>> import org.apache.commons.logging.LogFactory;
>>> import org.apache.hadoop.hive.ql.exec.Description;
>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
>>> import org.apache.hadoop.hive.ql.metadata.HiveException;
>>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>> ector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>> rFactory;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
>>> ryObjectInspector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>> itiveObjectInspectorFactory;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
>>> ngObjectInspector;
>>> import org.apache.hadoop.io.FloatWritable;
>>>
>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>> import com.ABCD.common.attval.Utility;
>>> import com.ABCD.common.attval.array.BufferOperations;
>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>>
>>> @Description(name = "DenseVectorUDF",
>>> value = "Dense Vector UDF in Hive / Shark\n"
>>> + "_FUNC_(binary|hex) - "
>>> + "Returns the dense vector array<float> value\n",
>>> extended = "Dense Vector UDAF in Hive / Shark")
>>>
>>> public class DenseVectorUDF extends GenericUDF {
>>> private static final Log LOG = LogFactory.getLog(DenseVectorU
>>> DF.class.getName());
>>> private ObjectInspector inputOI;
>>> private ListObjectInspector outputOI;
>>>
>>> @Override
>>> public String getDisplayString(String[] children) {
>>> StringBuilder sb = new StringBuilder();
>>> sb.append("densevectorudf(");
>>> for (int i = 0; i < children.length; i++) {
>>> sb.append(children[i]);
>>> if (i + 1 != children.length) {
>>> sb.append(",");
>>> }
>>> }
>>> sb.append(")");
>>> return sb.toString();
>>> }
>>>
>>> @Override
>>> public ObjectInspector initialize(ObjectInspector[] arguments) throws
>>> UDFArgumentException {
>>> if (arguments.length == 1) {
>>> ObjectInspector first = arguments[0];
>>> if (!(first instanceof StringObjectInspector) && !(first instanceof
>>> BinaryObjectInspector)) {
>>> LOG.error("first argument must be a either binary or hex buffer");
>>> throw new UDFArgumentException("first argument must be a either binary
>>> or hex buffer");
>>> }
>>> inputOI = first;
>>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
>>> iveObjectInspectorFactory.writableFloatObjectInspector);
>>> } else {
>>> throw new UDFArgumentLengthException("Wrong argument length is passed.
>>> Arguments length is NOT supported.");
>>> }
>>> return outputOI;
>>> }
>>>
>>> @Override
>>> public Object evaluate(DeferredObject[] arguments) throws HiveException {
>>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>> Object object = arguments[0].get();
>>> Vector<Float> floatVector = null;
>>> ByteBuffer buff = null;
>>> if (inputOI instanceof StringObjectInspector) {
>>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
>>> t(object);
>>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
>>> } else if (inputOI instanceof BinaryObjectInspector) {
>>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
>>> t(object);
>>> buff = ByteBuffer.wrap(bytes);
>>> }
>>> floatVector = idv.getElements(buff);
>>> Object red [] = new Object[floatVector.size()];
>>> for(int index = 0; index < red.length; index++){
>>> red[index] = new FloatWritable(floatVector.get(index));
>>> }
>>> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
>>> return red;
>>> }
>>> }
>>>
>>> ------------------------------------------------------------
>>> ------------------------------
>>>
>>>
>>> Following is the code I have written for Phoenix UDF-
>>> ------------------------------------------------------------
>>> ------------------------------
>>> package org.apache.phoenix.expression.function;
>>>
>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>> import com.ABCD.common.attval.array.BufferOperations;
>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>> ector;
>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>> rFactory;
>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>> itiveObjectInspectorFactory;
>>> import org.apache.hadoop.io.FloatWritable;
>>> import org.apache.phoenix.expression.Expression;
>>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
>>> nspector;
>>> import org.apache.phoenix.parse.FunctionParseNode.Argument;
>>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
>>> import org.apache.phoenix.schema.SortOrder;
>>> import org.apache.phoenix.schema.tuple.Tuple;
>>> import org.apache.phoenix.schema.types.PDataType;
>>> import org.apache.phoenix.schema.types.PFloatArray;
>>> import org.apache.phoenix.schema.types.PVarbinary;
>>>
>>> import java.nio.ByteBuffer;
>>> import java.nio.FloatBuffer;
>>> import java.sql.SQLException;
>>> import java.util.List;
>>> import java.util.Vector;
>>>
>>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>>>         @Argument(allowedTypes = {PVarbinary.class})})
>>> public class DenseVectorFunction extends ScalarFunction {
>>>     public static final String NAME = "DenseVectorFunction";
>>>     private ListObjectInspector outputOI;
>>>
>>>     public DenseVectorFunction() {
>>>     }
>>>
>>>     public DenseVectorFunction(List<Expression> children) throws
>>> SQLException {
>>>         super(children);
>>>     }
>>>
>>>     @Override
>>>     public String getName() {
>>>         return NAME;
>>>     }
>>>
>>>     public Expression getElementExpr() {
>>>         return children.get(0);
>>>     }
>>>
>>>     @Override
>>>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>>>         if (!getElementExpr().evaluate(tuple, ptr)) {
>>>             return false;
>>>         }
>>>         Object element = getElementExpr().getDataType().toObject(ptr,
>>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
>>> getElementExpr().getScale());
>>>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>>         PhoenixBinaryObjectInspector pboi = new
>>> PhoenixBinaryObjectInspector();
>>>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>>>         Object object = ptr.get();
>>>         Vector<Float> floatVector = null;
>>>         ByteBuffer buff = null;
>>>         buff = ByteBuffer.wrap(bytes);
>>>         floatVector = idv.getElements(buff);
>>>
>>>         Object[] red = new Object[floatVector.size()];
>>>         for (int index = 0; index < red.length; index++) {
>>>             red[index] = new FloatWritable(floatVector.get(index));
>>>             System.out.println("" + floatVector.get(index));
>>>         }
>>>         System.out.println("Buffer header = " +
>>> BufferOperations.stringifyBuffer(buff)); // This prints header info in
>>> ByteBuffer which is correct
>>> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
>>>         ptr.set(??);
>>>         return true;
>>>     }
>>>
>>>     @Override
>>>     public SortOrder getSortOrder() {
>>>         return children.get(0).getSortOrder();
>>>     }
>>>
>>>     @Override
>>>     public PDataType getDataType() {
>>>         return PFloatArray.INSTANCE;
>>>     }
>>> }
>>> ------------------------------------------------------------
>>> ------------------------------
>>>
>>> Any help will be much appreciated.
>>>
>>
>>
>

Reply via email to