sky-sheep opened a new issue, #12508:
URL: https://github.com/apache/iotdb/issues/12508

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/iotdb/issues) and 
found nothing similar.
   
   
   ### Version
   
   1.3.2
   
   ### Describe the bug and provide the minimal reproduce step
   
   写了一个UDAF,用来统计数量,加了一个参数filter用来过滤数据,如果不加group by 
level就能够正常执行,如果加了level就不能够正常执行,抛出异常301
   不加的sql如下所示:
   ![881715313071_ 
pic](https://github.com/apache/iotdb/assets/65699423/0ea83e10-b5b0-4523-8727-66f20a2f6368)
   如果加了 group by level就会报错,如下所示:
   ![871715313067_ 
pic](https://github.com/apache/iotdb/assets/65699423/d73fa247-e937-4d93-ae21-37a89aab9fcb)
   我用count这种内置的聚合函数就没问题,即使参数filter这种没有定义的,而且,这个countFill也能够正常接收参数,filter能够传进去,
   ![951715313417_ 
pic](https://github.com/apache/iotdb/assets/65699423/83963266-0a8d-419e-9b53-f984d92dedec)
   报错日志如下:
   ![1141715318978_ 
pic](https://github.com/apache/iotdb/assets/65699423/317a95ee-558a-4df8-80ae-a36a6bd145b8)
   UDAF代码如下:
   `package org.apache.iotdb.udf.api;
   
   import org.apache.iotdb.udf.api.customizer.config.UDAFConfigurations;
   import org.apache.iotdb.udf.api.customizer.parameter.UDFParameterValidator;
   import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
   import org.apache.iotdb.udf.api.type.Type;
   import org.apache.iotdb.udf.api.utils.ExecuteEval;
   import org.apache.iotdb.udf.api.utils.ResultValue;
   import org.apache.tsfile.block.column.Column;
   import org.apache.tsfile.utils.BitMap;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.nio.ByteBuffer;
   import java.util.Arrays;
   import java.util.List;
   
   /**
    * 个数统计函数
    * 获取所有符合条件的个数
    * @author ylx
    */
   public class CountFill implements UDAF {
   
       private Logger logger = LoggerFactory.getLogger(CountFill.class);
   
       private Type dataType;
   
       private List<String> filter;
       static class CountState implements State {
   
           Long count;
   
           @Override
           public void reset() {
               count = 0L;
           }
   
           @Override
           public byte[] serialize() {
               ByteBuffer buffer = ByteBuffer.allocate(Double.BYTES + 
Long.BYTES);
               buffer.putLong(count);
   
               return buffer.array();
           }
   
           @Override
           public void deserialize(byte[] bytes) {
               ByteBuffer buffer = ByteBuffer.wrap(bytes);
               count = buffer.getLong();
           }
       }
   
       /**
        * 在初始化方法beforeStart调用前执行,用于检测UDFParameters中用户输入的参数是否合法。该方法与 UDTF 
的validate相同。
        */
       @Override
       public void validate(UDFParameterValidator validator) throws Exception {
           validator
                   .validateInputSeriesNumber(1)
                   .validateInputSeriesDataType(0, Type.INT32, Type.INT64, 
Type.FLOAT, Type.DOUBLE,Type.TEXT);
       }
   
       /**
        * 初始化方法,在 UDAF 处理输入数据前,调用用户自定义的初始化行为。与 UDTF 不同的是,这里的 configuration 是 
UDAFConfiguration 类型
        * @param parameters
        * @param configurations
        */
       @Override
       public void beforeStart(UDFParameters parameters, UDAFConfigurations 
configurations) {
           String inputFilter = parameters.getString("filter");
           if (inputFilter !=null ){
               filter = (Arrays.asList(inputFilter.split(",")));
           }
           dataType = parameters.getDataType(0);
           configurations.setOutputDataType(Type.INT64);
       }
   
       /**
        * 创建State对象,一般只需要调用默认构造函数,然后按需修改默认的初始值即可。
        */
       @Override
       public State createState() {
           CountState countState = new CountState();
           countState.count = 0L;
           return countState;
       }
   
       /**
        * 根据传入的数据Column[]批量地更新State对象,注意 column[0] 
总是代表时间列。另外BitMap表示之前已经被过滤掉的数据,您在编写该方法时需要手动判断对应的数据是否被过滤掉
        * values
        *
        * @param state   state to be updated
        * @param columns input columns from IoTDB TsBlock, time column is 
always the last column, the
        *                remaining columns are their parameter value columns
        * @param bitMap  define some filtered position in columns
        */
       @Override
       public void addInput(State state, Column[] columns, BitMap bitMap) {
           CountState countState = (CountState) state;
           switch (dataType) {
               case INT32:
               case INT64:
               case FLOAT:
               case DOUBLE:
               case TEXT:
               case BOOLEAN:
               default:
                   addIntInput(countState, columns, bitMap);
                   break;
           }
       }
   
       public void addIntInput(CountState state, Column[] columns, BitMap 
bitMap) {
           int count = columns[0].getPositionCount();
           for (int i = 0; i < count; i++) {
               Boolean filterFlag = true;
               if (bitMap != null && !bitMap.isMarked(i)) {
                   continue;
               }
               if (filter != null && filter.size()>0){
                   for (String fill : filter) {
                       String comparisonOperators = 
ExecuteEval.findComparisonOperators(fill);
                       String replace = fill.replace(comparisonOperators, 
"").trim();
                       if (ExecuteEval.isString(replace,dataType)){
                           // 如果是字符串,则将其拼接上单引号
                           replace = ("'"+replace+"'").trim();
                           switch (dataType){
                               case INT32:
                                   // int类型
                                   filterFlag 
=ExecuteEval.evalString("('"+columns[0].getInt(i)+"'"+comparisonOperators+replace+")?true:false");
                                   break;
                               case INT64:
                                   filterFlag 
=ExecuteEval.evalString("('"+columns[0].getLong(i)+"'"+comparisonOperators+replace+")?true:false");
                                   break;
                               case FLOAT:
                                   filterFlag 
=ExecuteEval.evalString("('"+columns[0].getFloat(i)+"'"+comparisonOperators+replace+")?true:false");
                                   break;
                               case DOUBLE:
                                   filterFlag 
=ExecuteEval.evalString("('"+columns[0].getDouble(i)+"'"+comparisonOperators+replace+")?true:false");
                                   break;
                               case TEXT:
                                   filterFlag 
=ExecuteEval.evalString("('"+columns[0].getObject(i)+"'"+comparisonOperators+replace+")?true:false");
                                   break;
                               default:
                                   break;
                           }
   
                       }else {
                           switch (dataType){
                               case INT32:
                                   // int类型
                                   filterFlag 
=ExecuteEval.evalString("("+columns[0].getInt(i)+comparisonOperators+replace+")?true:false");
                                   break;
                               case INT64:
                                   filterFlag 
=ExecuteEval.evalString("("+columns[0].getLong(i)+comparisonOperators+replace+")?true:false");
                                   break;
                               case FLOAT:
                                   filterFlag 
=ExecuteEval.evalString("("+columns[0].getFloat(i)+comparisonOperators+replace+")?true:false");
                                   break;
                               case DOUBLE:
                                   filterFlag 
=ExecuteEval.evalString("("+columns[0].getDouble(i)+comparisonOperators+replace+")?true:false");
                                   break;
                               case TEXT:
                                   filterFlag 
=ExecuteEval.evalString("("+columns[0].getObject(i)+comparisonOperators+replace+")?true:false");
                                   break;
                               default:
                                   break;
                           }
                       }
                   }
               }
               if (!columns[0].isNull(i) && filterFlag) {
                   state.count++;
               }
           }
       }
   
       /**
        * 将rhs状态合并至state状态中。在分布式场景下,同一组的数据可能分布在不同节点上,IoTDB 
会为每个节点上的部分数据生成一个State对象,然后调用该方法合并成完整的State。
        *
        * @param state current state
        * @param rhs   right-hand-side state to be merged
        */
       @Override
       public void combineState(State state, State rhs) {
           CountState avgState = (CountState) state;
           CountState avgRhs = (CountState) rhs;
   
           avgState.count += avgRhs.count;
       }
   
       /**
        * 根据State中的数据,计算出最终的聚合结果。注意根据聚合的语义,每一组只能输出一个值。
        *
        * @param state       final state
        * @param resultValue used to collect output data points
        */
       @Override
       public void outputFinal(State state, ResultValue resultValue) {
           CountState avgState = (CountState) state;
   
           resultValue.setLong(avgState.count);
       }
   
   
   }`
   
   ### What did you expect to see?
   
   无
   
   ### What did you see instead?
   
   无
   
   ### Anything else?
   
   无
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to