Hi Rajesh,

Thanks for reply. But we can surely write UDAF's on lines of default SUM
and AVG functions in phoenix, which are present by default.
We need a good tutorial or documentation for this.

Thanks,
AJ

On Mon, Oct 3, 2016 at 2:05 PM, rajeshb...@apache.org <
chrajeshbab...@gmail.com> wrote:

> Hi Akhil,
>
> There is no support for UDAFs in Phoenix at present.
>
> Thanks,
> Rajeshbabu.
>
> On Sun, Oct 2, 2016 at 6:57 PM, akhil jain <akhilcancer...@gmail.com>
> wrote:
>
>> Thanks James. It worked.
>>
>> Can you please provide me pointers to write UDAFs in phoenix like we
>> have GenericUDAFEvaluator for writing Hive UDAFs.
>> I am looking for a tutorial like http://beekeeperdata.com/
>> posts/hadoop/2015/08/17/hive-udaf-tutorial.html for phoenix.
>>
>> Thanks,
>> Akhil
>>
>> On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <jamestay...@apache.org>
>> wrote:
>>
>>> Hi Akhil,
>>> You want to create an Array, convert it to its byte[] representation,
>>> and set the ptr argument to point to it. Take a look at ArrayIT for
>>> examples of creating an Array:
>>>
>>>     // Create Array of FLOAT
>>>     Float[] floatArr =  new Float[2];
>>>     floatArr[0] = 64.87;
>>>     floatArr[1] = 89.96;
>>>     Array array = conn.createArrayOf("FLOAT", floatArr);
>>>     // Convert to byte[]
>>>     byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array);
>>>     // Set ptr to byte[]
>>>     ptr.set(arrayAsBytes);
>>>
>>> Thanks,
>>> James
>>>
>>>
>>> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <akhilcancer...@gmail.com>
>>> wrote:
>>>
>>>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns
>>>> whose datatype is 'VARBINARY'.
>>>> The data in these columns is compressed float[] in form of ByteBuffer
>>>> called DenseVector which is an ordered set of 16 bit IEEE floats of
>>>> cardinality no more than 3996.
>>>> I have loaded data into phoenix tables through spark-phoenix plugin.
>>>> Just to give an idea the mapreduce jobs write data in hive in parquet gzip
>>>> format. I read data into a dataframe using sqlContext.parquetFile() ,
>>>> register it as temp table and run a sqlContext.sql("select query ...")
>>>> query and finally calling res.save("org.apache.phoenix.spark",
>>>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" ->
>>>> "localhost:2181"))
>>>> We have a hive/shark UDF(code pasted below) that can decode these
>>>> ByteBuffer columns and display them in readable float[]. So this UDF works
>>>> on spark-shell.
>>>> Now I just want to write a similar UDF in phoenix and run queries as "
>>>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from
>>>> siteflowtable" and further write UDAFs over it.
>>>> How do I make phoenix UDF return float[] ?? I have tried a lot many
>>>> things but none seem to work.
>>>>
>>>> Below is the code for hive/shark UDF-
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>> package com.ABCD.densevectorudf;
>>>>
>>>> import java.nio.ByteBuffer;
>>>> import java.util.Vector;
>>>>
>>>> import org.apache.commons.logging.Log;
>>>> import org.apache.commons.logging.LogFactory;
>>>> import org.apache.hadoop.hive.ql.exec.Description;
>>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
>>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
>>>> import org.apache.hadoop.hive.ql.metadata.HiveException;
>>>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>>> ector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>>> rFactory;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina
>>>> ryObjectInspector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>>> itiveObjectInspectorFactory;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri
>>>> ngObjectInspector;
>>>> import org.apache.hadoop.io.FloatWritable;
>>>>
>>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>>> import com.ABCD.common.attval.Utility;
>>>> import com.ABCD.common.attval.array.BufferOperations;
>>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>>>
>>>> @Description(name = "DenseVectorUDF",
>>>> value = "Dense Vector UDF in Hive / Shark\n"
>>>> + "_FUNC_(binary|hex) - "
>>>> + "Returns the dense vector array<float> value\n",
>>>> extended = "Dense Vector UDAF in Hive / Shark")
>>>>
>>>> public class DenseVectorUDF extends GenericUDF {
>>>> private static final Log LOG = LogFactory.getLog(DenseVectorU
>>>> DF.class.getName());
>>>> private ObjectInspector inputOI;
>>>> private ListObjectInspector outputOI;
>>>>
>>>> @Override
>>>> public String getDisplayString(String[] children) {
>>>> StringBuilder sb = new StringBuilder();
>>>> sb.append("densevectorudf(");
>>>> for (int i = 0; i < children.length; i++) {
>>>> sb.append(children[i]);
>>>> if (i + 1 != children.length) {
>>>> sb.append(",");
>>>> }
>>>> }
>>>> sb.append(")");
>>>> return sb.toString();
>>>> }
>>>>
>>>> @Override
>>>> public ObjectInspector initialize(ObjectInspector[] arguments) throws
>>>> UDFArgumentException {
>>>> if (arguments.length == 1) {
>>>> ObjectInspector first = arguments[0];
>>>> if (!(first instanceof StringObjectInspector) && !(first instanceof
>>>> BinaryObjectInspector)) {
>>>> LOG.error("first argument must be a either binary or hex buffer");
>>>> throw new UDFArgumentException("first argument must be a either binary
>>>> or hex buffer");
>>>> }
>>>> inputOI = first;
>>>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit
>>>> iveObjectInspectorFactory.writableFloatObjectInspector);
>>>> } else {
>>>> throw new UDFArgumentLengthException("Wrong argument length is passed.
>>>> Arguments length is NOT supported.");
>>>> }
>>>> return outputOI;
>>>> }
>>>>
>>>> @Override
>>>> public Object evaluate(DeferredObject[] arguments) throws HiveException
>>>> {
>>>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>>> Object object = arguments[0].get();
>>>> Vector<Float> floatVector = null;
>>>> ByteBuffer buff = null;
>>>> if (inputOI instanceof StringObjectInspector) {
>>>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec
>>>> t(object);
>>>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex));
>>>> } else if (inputOI instanceof BinaryObjectInspector) {
>>>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec
>>>> t(object);
>>>> buff = ByteBuffer.wrap(bytes);
>>>> }
>>>> floatVector = idv.getElements(buff);
>>>> Object red [] = new Object[floatVector.size()];
>>>> for(int index = 0; index < red.length; index++){
>>>> red[index] = new FloatWritable(floatVector.get(index));
>>>> }
>>>> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff));
>>>> return red;
>>>> }
>>>> }
>>>>
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>>
>>>>
>>>> Following is the code I have written for Phoenix UDF-
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>> package org.apache.phoenix.expression.function;
>>>>
>>>> import com.ABCD.common.attval.IDenseVectorOperator;
>>>> import com.ABCD.common.attval.array.BufferOperations;
>>>> import com.ABCD.common.attval.array.FloatArrayFactory;
>>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp
>>>> ector;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto
>>>> rFactory;
>>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim
>>>> itiveObjectInspectorFactory;
>>>> import org.apache.hadoop.io.FloatWritable;
>>>> import org.apache.phoenix.expression.Expression;
>>>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI
>>>> nspector;
>>>> import org.apache.phoenix.parse.FunctionParseNode.Argument;
>>>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
>>>> import org.apache.phoenix.schema.SortOrder;
>>>> import org.apache.phoenix.schema.tuple.Tuple;
>>>> import org.apache.phoenix.schema.types.PDataType;
>>>> import org.apache.phoenix.schema.types.PFloatArray;
>>>> import org.apache.phoenix.schema.types.PVarbinary;
>>>>
>>>> import java.nio.ByteBuffer;
>>>> import java.nio.FloatBuffer;
>>>> import java.sql.SQLException;
>>>> import java.util.List;
>>>> import java.util.Vector;
>>>>
>>>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = {
>>>>         @Argument(allowedTypes = {PVarbinary.class})})
>>>> public class DenseVectorFunction extends ScalarFunction {
>>>>     public static final String NAME = "DenseVectorFunction";
>>>>     private ListObjectInspector outputOI;
>>>>
>>>>     public DenseVectorFunction() {
>>>>     }
>>>>
>>>>     public DenseVectorFunction(List<Expression> children) throws
>>>> SQLException {
>>>>         super(children);
>>>>     }
>>>>
>>>>     @Override
>>>>     public String getName() {
>>>>         return NAME;
>>>>     }
>>>>
>>>>     public Expression getElementExpr() {
>>>>         return children.get(0);
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {
>>>>         if (!getElementExpr().evaluate(tuple, ptr)) {
>>>>             return false;
>>>>         }
>>>>         Object element = getElementExpr().getDataType().toObject(ptr,
>>>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(),
>>>> getElementExpr().getScale());
>>>>         IDenseVectorOperator idv = FloatArrayFactory.getFloatArray();
>>>>         PhoenixBinaryObjectInspector pboi = new
>>>> PhoenixBinaryObjectInspector();
>>>>         byte[] bytes = pboi.getPrimitiveJavaObject(element);
>>>>         Object object = ptr.get();
>>>>         Vector<Float> floatVector = null;
>>>>         ByteBuffer buff = null;
>>>>         buff = ByteBuffer.wrap(bytes);
>>>>         floatVector = idv.getElements(buff);
>>>>
>>>>         Object[] red = new Object[floatVector.size()];
>>>>         for (int index = 0; index < red.length; index++) {
>>>>             red[index] = new FloatWritable(floatVector.get(index));
>>>>             System.out.println("" + floatVector.get(index));
>>>>         }
>>>>         System.out.println("Buffer header = " +
>>>> BufferOperations.stringifyBuffer(buff)); // This prints header info in
>>>> ByteBuffer which is correct
>>>> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY
>>>>         ptr.set(??);
>>>>         return true;
>>>>     }
>>>>
>>>>     @Override
>>>>     public SortOrder getSortOrder() {
>>>>         return children.get(0).getSortOrder();
>>>>     }
>>>>
>>>>     @Override
>>>>     public PDataType getDataType() {
>>>>         return PFloatArray.INSTANCE;
>>>>     }
>>>> }
>>>> ------------------------------------------------------------
>>>> ------------------------------
>>>>
>>>> Any help will be much appreciated.
>>>>
>>>
>>>
>>
>

Reply via email to