Hi xingbo, 文档中给的例子udtf需要和join一起使用,但是我现在不需要join,只是单纯的转换结果 如果直接调用了udtf后sink,会提示 Cause: Different number of columns. Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >]
Sink schema: [buyQtl: BIGINT, aveBuy: INT] [email protected] 发件人: Xingbo Huang 发送时间: 2020-12-14 11:38 收件人: user-zh 主题: Re: Re: Pandas UDF处理过的数据sink问题 Hi, 你想要一列变多列的话,你需要使用UDTF了,具体使用方式,你可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#table-functions Best, Xingbo [email protected] <[email protected]> 于2020年12月14日周一 上午11:00写道: > 多谢你的回复。这个问题已处理好了,确实如你所说需要将@udf换成@udaf。 > 但现在有另一个问题,根据文档 > > https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions > Vectorized Python aggregate functions takes one or more pandas.Series as > the inputs and return one scalar value as output. > Note The return type does not support RowType and MapType for the time > being. > udaf仅允许返回单个值,所以我再udaf里把所有值用‘,’连接后用STRING方式返回了,将这个STRING直接sink掉是没问题的。 > 现在是后面用另一个udf把这个string再做拆分,代码大概如下: > @udf(result_type=DataTypes.ROW( > [DataTypes.FIELD('value1', DataTypes.BIGINT()), > DataTypes.FIELD('value2', DataTypes.INT())])) > def flattenStr(inputStr): > ret_array = [int(x) for x in inputStr.split(',')] > return Row(ret_array[0], ret_array[1]) > t_env.create_temporary_function("flattenStr", flattenStr)aggregate_table = > order_table.window(tumble_window) \ > .group_by("w") \ > .select("**调用udaf** as aggValue") > result_table = aggregate_table.select("flattenStr(aggValue) as retValue") > > result_table.select(result_table.retValue.flatten).execute_insert("csvSink")上传到flink编译没有问题,但运行是报错了,不太明白报错的含义,不知道是否是udf返回的类型不正确引起的 > Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.apache.flink.table.runtime.operators.python.scalar.AbstractPythonScalarFunctionOperator.getPythonEnv(AbstractPythonScalarFunctionOperator.java:99) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.createPythonEnvironmentManager(AbstractPythonFunctionOperator.java:306) > at > org.apache.flink.table.runtime.operators.python.AbstractStatelessFunctionOperator.createPythonFunctionRunner(AbstractStatelessFunctionOperator.java:151) > at > org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:122) > > > [email protected] > > 发件人: Wei Zhong > 发送时间: 2020-12-14 10:38 > 收件人: user-zh > 主题: Re: Pandas UDF处理过的数据sink问题 > Hi Lucas, > > 是这样的,这个Pandas的输出类型是一列Row, 而你现在的sink需要接收的是一列BIGINT和一列INT。 > > 你可以尝试将sql语句改成以下形式: > > select orderCalc(code, amount).get(0), orderCalc(code, amount).get(1) > from `some_source` > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount > > 此外你这里实际是Pandas UDAF的用法吧,如果是的话则需要把”@udf”换成”@udaf” > > Best, > Wei > > > 在 2020年12月13日,13:13,Lucas <[email protected]> 写道: > > > > 使用了1.12.0的flink,3.7的python。自定义了一个pandas的UDF,定义大概如下 > > > > @udf(input_types=[DataTypes.STRING(), DataTypes.FLOAT()], > > result_type=DataTypes.ROW( > > [DataTypes.FIELD('buyQtl', DataTypes.BIGINT()), > > DataTypes.FIELD('aveBuy', DataTypes.INT())), > > func_type='pandas') > > def orderCalc(code, amount): > > > > df = pd.DataFrame({'code': code, 'amount': amount}) > > # pandas 数据处理后输入另一个dataframe output > > return (output['buyQtl'], output['aveBuy']) > > > > > > 定义了csv的sink如下 > > > > create table csvSink ( > > buyQtl BIGINT, > > aveBuy INT > > ) with ( > > 'connector.type' = 'filesystem', > > 'format.type' = 'csv', > > 'connector.path' = 'e:/output' > > ) > > > > > > > > 然后进行如下的操作: > > > > result_table = t_env.sql_query(""" > > select orderCalc(code, amount) > > from `some_source` > > group by TUMBLE(eventTime, INTERVAL '1' SECOND), code, amount > > """) > > result_table.execute_insert("csvSink") > > > > > > > > 在执行程序的时候提示没法入库 > > > > py4j.protocol.Py4JJavaError: An error occurred while calling > > o98.executeInsert. > > > > : org.apache.flink.table.api.ValidationException: Column types of query > > result and sink for registered table > > 'default_catalog.default_database.csvSink' do not match. > > > > Cause: Different number of columns. > > > > > > > > Query schema: [EXPR$0: ROW<`buyQtl` BIGINT, `aveBuy` INT >] > > > > Sink schema: [buyQtl: BIGINT, aveBuy: INT] > > > > at > > > org.apache.flink.table.planner.sinks.DynamicSinkUtils.createSchemaMismatchEx > > ception(DynamicSinkUtils.java:304) > > > > at > > > org.apache.flink.table.planner.sinks.DynamicSinkUtils.validateSchemaAndApply > > ImplicitCast(DynamicSinkUtils.java:134) > > > > > > > > 是UDF的输出结构不对吗,还是需要调整sink table的结构? > > > > >
