Hi,
我觉得你从头详细描述一下你的表结构。
比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
Best,
Xingbo
<[email protected]> 于2020年9月3日周四 下午9:45写道:
> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>
> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> 或者正确写法是什么样的,感谢解答!
>
>
> | |
> whh_960101
> |
> |
> 邮箱:[email protected]
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年09月03日 21:14,Xingbo Huang 写道:
> Hi,
> input_types定义的是每一个列的具体类型。
> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> 正确的写法是
>
> input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()]
>
> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> input_types=DataTypes.Row([DataTypes.FIELD("a", DataTypes.STRING()),
> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> DataTypes.STRING())])
>
> Best,
> Xingbo
>
> whh_960101 <[email protected]> 于2020年9月3日周四 下午9:03写道:
>
> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> > input_type:input_type should be DataType but contain RowField(RECID,
> > VARCHAR)
> > 我的pyflink版本:1.11.1
>