Hi,

你是调试的时候想看结果吗?
你可以直接table.to_pandas()来看结果,或者用print connector来看。

个人觉得to_pandas最简单,比如你可以试试下面的例子

```
table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])

@udf(input_types=DataTypes.STRING(),
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def func(a):
     return np.array([a, a, a], dtype=str)

table_env.register_function("func", func)

table.select("func(b)").to_pandas()
```
然后,你可以看看官方文档[1],让你快速上手PyFlink

Best,
Xingbo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html

whh_960101 <[email protected]> 于2020年9月4日周五 下午2:50写道:

> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
> 我的udf输出了一个numpy.array(dtype = str),
> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>
> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
> 请问这个问题该怎么解决?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-09-04 10:35:03,"Xingbo Huang" <[email protected]> 写道:
> >Hi,
> >
>
> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
> >
> >Best,
> >Xingbo
> >
> >whh_960101 <[email protected]> 于2020年9月4日周五 上午9:26写道:
> >
> >>
> >>
> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
> >> udf定义如下:
> >>
> >>
> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
> >> def fun(data):
> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
> >>
> >>
> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
> >> 希望您能给我提供好的解决办法,万分感谢!
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[email protected]> 写道:
> >> >Hi,
> >> >
> >> >我觉得你从头详细描述一下你的表结构。
> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
> >> >
> >> >[1]
> >> >
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
> >> >
> >> >Best,
> >> >Xingbo
> >> >
> >> ><[email protected]> 于2020年9月3日周四 下午9:45写道:
> >> >
> >> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()]
> >> >>
> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
> >> >> 或者正确写法是什么样的,感谢解答!
> >> >>
> >> >>
> >> >> | |
> >> >> whh_960101
> >> >> |
> >> >> |
> >> >> 邮箱:[email protected]
> >> >> |
> >> >>
> >> >> 签名由 网易邮箱大师 定制
> >> >>
> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
> >> >> Hi,
> >> >> input_types定义的是每一个列的具体类型。
> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
> >> >> 正确的写法是
> >> >>
> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
> >> DataTypes.STRING()]
> >> >>
> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
> DataTypes.STRING()),
> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
> >> >> DataTypes.STRING())])
> >> >>
> >> >> Best,
> >> >> Xingbo
> >> >>
> >> >> whh_960101 <[email protected]> 于2020年9月3日周四 下午9:03写道:
> >> >>
> >> >> > 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
> >> >> > input_type:input_type should be DataType but contain
> RowField(RECID,
> >> >> > VARCHAR)
> >> >> > 我的pyflink版本:1.11.1
> >> >>
> >>
>

回复