Hi Rajesh, Thanks for reply. But we can surely write UDAF's on lines of default SUM and AVG functions in phoenix, which are present by default. We need a good tutorial or documentation for this.
Thanks, AJ On Mon, Oct 3, 2016 at 2:05 PM, rajeshb...@apache.org < chrajeshbab...@gmail.com> wrote: > Hi Akhil, > > There is no support for UDAFs in Phoenix at present. > > Thanks, > Rajeshbabu. > > On Sun, Oct 2, 2016 at 6:57 PM, akhil jain <akhilcancer...@gmail.com> > wrote: > >> Thanks James. It worked. >> >> Can you please provide me pointers to write UDAFs in phoenix like we >> have GenericUDAFEvaluator for writing Hive UDAFs. >> I am looking for a tutorial like http://beekeeperdata.com/ >> posts/hadoop/2015/08/17/hive-udaf-tutorial.html for phoenix. >> >> Thanks, >> Akhil >> >> On Sun, Oct 2, 2016 at 7:03 AM, James Taylor <jamestay...@apache.org> >> wrote: >> >>> Hi Akhil, >>> You want to create an Array, convert it to its byte[] representation, >>> and set the ptr argument to point to it. Take a look at ArrayIT for >>> examples of creating an Array: >>> >>> // Create Array of FLOAT >>> Float[] floatArr = new Float[2]; >>> floatArr[0] = 64.87; >>> floatArr[1] = 89.96; >>> Array array = conn.createArrayOf("FLOAT", floatArr); >>> // Convert to byte[] >>> byte[] arrayAsBytes = PFloatArray.INSTANCE.toBytes(array); >>> // Set ptr to byte[] >>> ptr.set(arrayAsBytes); >>> >>> Thanks, >>> James >>> >>> >>> On Sat, Oct 1, 2016 at 9:19 AM, akhil jain <akhilcancer...@gmail.com> >>> wrote: >>> >>>> I am using hbase 1.1 with phoenix 4.8. I have a table with columns >>>> whose datatype is 'VARBINARY'. >>>> The data in these columns is compressed float[] in form of ByteBuffer >>>> called DenseVector which is an ordered set of 16 bit IEEE floats of >>>> cardinality no more than 3996. >>>> I have loaded data into phoenix tables through spark-phoenix plugin. >>>> Just to give an idea the mapreduce jobs write data in hive in parquet gzip >>>> format. I read data into a dataframe using sqlContext.parquetFile() , >>>> register it as temp table and run a sqlContext.sql("select query ...") >>>> query and finally calling res.save("org.apache.phoenix.spark", >>>> SaveMode.Overwrite, Map("table" -> "SITEFLOWTABLE" ,"zkUrl" -> >>>> "localhost:2181")) >>>> We have a hive/shark UDF(code pasted below) that can decode these >>>> ByteBuffer columns and display them in readable float[]. So this UDF works >>>> on spark-shell. >>>> Now I just want to write a similar UDF in phoenix and run queries as " >>>> select uplinkcostbuffer,DENSEVECTORUDF(uplinkcostbuffer) from >>>> siteflowtable" and further write UDAFs over it. >>>> How do I make phoenix UDF return float[] ?? I have tried a lot many >>>> things but none seem to work. >>>> >>>> Below is the code for hive/shark UDF- >>>> ------------------------------------------------------------ >>>> ------------------------------ >>>> package com.ABCD.densevectorudf; >>>> >>>> import java.nio.ByteBuffer; >>>> import java.util.Vector; >>>> >>>> import org.apache.commons.logging.Log; >>>> import org.apache.commons.logging.LogFactory; >>>> import org.apache.hadoop.hive.ql.exec.Description; >>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentException; >>>> import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException; >>>> import org.apache.hadoop.hive.ql.metadata.HiveException; >>>> import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; >>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp >>>> ector; >>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; >>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto >>>> rFactory; >>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Bina >>>> ryObjectInspector; >>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim >>>> itiveObjectInspectorFactory; >>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Stri >>>> ngObjectInspector; >>>> import org.apache.hadoop.io.FloatWritable; >>>> >>>> import com.ABCD.common.attval.IDenseVectorOperator; >>>> import com.ABCD.common.attval.Utility; >>>> import com.ABCD.common.attval.array.BufferOperations; >>>> import com.ABCD.common.attval.array.FloatArrayFactory; >>>> >>>> @Description(name = "DenseVectorUDF", >>>> value = "Dense Vector UDF in Hive / Shark\n" >>>> + "_FUNC_(binary|hex) - " >>>> + "Returns the dense vector array<float> value\n", >>>> extended = "Dense Vector UDAF in Hive / Shark") >>>> >>>> public class DenseVectorUDF extends GenericUDF { >>>> private static final Log LOG = LogFactory.getLog(DenseVectorU >>>> DF.class.getName()); >>>> private ObjectInspector inputOI; >>>> private ListObjectInspector outputOI; >>>> >>>> @Override >>>> public String getDisplayString(String[] children) { >>>> StringBuilder sb = new StringBuilder(); >>>> sb.append("densevectorudf("); >>>> for (int i = 0; i < children.length; i++) { >>>> sb.append(children[i]); >>>> if (i + 1 != children.length) { >>>> sb.append(","); >>>> } >>>> } >>>> sb.append(")"); >>>> return sb.toString(); >>>> } >>>> >>>> @Override >>>> public ObjectInspector initialize(ObjectInspector[] arguments) throws >>>> UDFArgumentException { >>>> if (arguments.length == 1) { >>>> ObjectInspector first = arguments[0]; >>>> if (!(first instanceof StringObjectInspector) && !(first instanceof >>>> BinaryObjectInspector)) { >>>> LOG.error("first argument must be a either binary or hex buffer"); >>>> throw new UDFArgumentException("first argument must be a either binary >>>> or hex buffer"); >>>> } >>>> inputOI = first; >>>> outputOI = ObjectInspectorFactory.getStandardListObjectInspector(Primit >>>> iveObjectInspectorFactory.writableFloatObjectInspector); >>>> } else { >>>> throw new UDFArgumentLengthException("Wrong argument length is passed. >>>> Arguments length is NOT supported."); >>>> } >>>> return outputOI; >>>> } >>>> >>>> @Override >>>> public Object evaluate(DeferredObject[] arguments) throws HiveException >>>> { >>>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); >>>> Object object = arguments[0].get(); >>>> Vector<Float> floatVector = null; >>>> ByteBuffer buff = null; >>>> if (inputOI instanceof StringObjectInspector) { >>>> String hex = ((StringObjectInspector) inputOI).getPrimitiveJavaObjec >>>> t(object); >>>> buff = ByteBuffer.wrap(Utility.hexToBytes(hex)); >>>> } else if (inputOI instanceof BinaryObjectInspector) { >>>> byte[] bytes = ((BinaryObjectInspector) inputOI).getPrimitiveJavaObjec >>>> t(object); >>>> buff = ByteBuffer.wrap(bytes); >>>> } >>>> floatVector = idv.getElements(buff); >>>> Object red [] = new Object[floatVector.size()]; >>>> for(int index = 0; index < red.length; index++){ >>>> red[index] = new FloatWritable(floatVector.get(index)); >>>> } >>>> LOG.info("Buffer header = " + BufferOperations.stringifyBuffer(buff)); >>>> return red; >>>> } >>>> } >>>> >>>> ------------------------------------------------------------ >>>> ------------------------------ >>>> >>>> >>>> Following is the code I have written for Phoenix UDF- >>>> ------------------------------------------------------------ >>>> ------------------------------ >>>> package org.apache.phoenix.expression.function; >>>> >>>> import com.ABCD.common.attval.IDenseVectorOperator; >>>> import com.ABCD.common.attval.array.BufferOperations; >>>> import com.ABCD.common.attval.array.FloatArrayFactory; >>>> import org.apache.hadoop.hbase.io.ImmutableBytesWritable; >>>> import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInsp >>>> ector; >>>> import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspecto >>>> rFactory; >>>> import org.apache.hadoop.hive.serde2.objectinspector.primitive.Prim >>>> itiveObjectInspectorFactory; >>>> import org.apache.hadoop.io.FloatWritable; >>>> import org.apache.phoenix.expression.Expression; >>>> import org.apache.phoenix.hive.objectinspector.PhoenixBinaryObjectI >>>> nspector; >>>> import org.apache.phoenix.parse.FunctionParseNode.Argument; >>>> import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction; >>>> import org.apache.phoenix.schema.SortOrder; >>>> import org.apache.phoenix.schema.tuple.Tuple; >>>> import org.apache.phoenix.schema.types.PDataType; >>>> import org.apache.phoenix.schema.types.PFloatArray; >>>> import org.apache.phoenix.schema.types.PVarbinary; >>>> >>>> import java.nio.ByteBuffer; >>>> import java.nio.FloatBuffer; >>>> import java.sql.SQLException; >>>> import java.util.List; >>>> import java.util.Vector; >>>> >>>> @BuiltInFunction(name = DenseVectorFunction.NAME, args = { >>>> @Argument(allowedTypes = {PVarbinary.class})}) >>>> public class DenseVectorFunction extends ScalarFunction { >>>> public static final String NAME = "DenseVectorFunction"; >>>> private ListObjectInspector outputOI; >>>> >>>> public DenseVectorFunction() { >>>> } >>>> >>>> public DenseVectorFunction(List<Expression> children) throws >>>> SQLException { >>>> super(children); >>>> } >>>> >>>> @Override >>>> public String getName() { >>>> return NAME; >>>> } >>>> >>>> public Expression getElementExpr() { >>>> return children.get(0); >>>> } >>>> >>>> @Override >>>> public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) { >>>> if (!getElementExpr().evaluate(tuple, ptr)) { >>>> return false; >>>> } >>>> Object element = getElementExpr().getDataType().toObject(ptr, >>>> getElementExpr().getSortOrder(), getElementExpr().getMaxLength(), >>>> getElementExpr().getScale()); >>>> IDenseVectorOperator idv = FloatArrayFactory.getFloatArray(); >>>> PhoenixBinaryObjectInspector pboi = new >>>> PhoenixBinaryObjectInspector(); >>>> byte[] bytes = pboi.getPrimitiveJavaObject(element); >>>> Object object = ptr.get(); >>>> Vector<Float> floatVector = null; >>>> ByteBuffer buff = null; >>>> buff = ByteBuffer.wrap(bytes); >>>> floatVector = idv.getElements(buff); >>>> >>>> Object[] red = new Object[floatVector.size()]; >>>> for (int index = 0; index < red.length; index++) { >>>> red[index] = new FloatWritable(floatVector.get(index)); >>>> System.out.println("" + floatVector.get(index)); >>>> } >>>> System.out.println("Buffer header = " + >>>> BufferOperations.stringifyBuffer(buff)); // This prints header info in >>>> ByteBuffer which is correct >>>> //HOW DO I MAKE IT RETURN FLOAT[] or PHOENIXARRAY >>>> ptr.set(??); >>>> return true; >>>> } >>>> >>>> @Override >>>> public SortOrder getSortOrder() { >>>> return children.get(0).getSortOrder(); >>>> } >>>> >>>> @Override >>>> public PDataType getDataType() { >>>> return PFloatArray.INSTANCE; >>>> } >>>> } >>>> ------------------------------------------------------------ >>>> ------------------------------ >>>> >>>> Any help will be much appreciated. >>>> >>> >>> >> >