您好,
图中像datagen和print这样的connector更像是表名,之前听说的只有json、csv、filesystem这种类型的connector,请问connector在使用连接器DDL创建表时的作用是什么

















在 2020-09-07 11:33:06,"Xingbo Huang" <[email protected]> 写道:
>Hi,
>你是想直接读一个python的提供的数据源把。这和udf是两个问题。你那个udf没啥问题,就是udf的返回类型是result_type,不是result_types。
>
>你想直接读python的数据结构的话,你可以考虑使用st_env.from_elements(测试用的,可以参考文档[1])或者st_env.from_pandas
>[2] 来读取一个dataframe。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>[2]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/conversion_of_pandas.html
>
>Best,
>Xingbo
>
>whh_960101 <[email protected]> 于2020年9月7日周一 上午11:22写道:
>
>> 您好,请问在定义UDF时,输入参数是一个我定义好的python字典,比如
>> dic = {1:'a',2:'b'}
>> 此时定义udf如下:
>>
>> @udf(input_types=[DataTypes.MAP(DataTypes.INT(),DataTypes.STRING()),DataTypes.STRING()],result_types=DataTypes.STRING())
>> def func(dic,f):
>>    ......
>>    return L
>> st_env.register_function("func", func)
>> st_env.from_path("source").select("func(dic,t)").insert_into("sink")
>> #这时我在外部定义好的数据类型dic字典如何作为参数传进来
>> 这种代码应该怎么写?感谢解答!因为官网没有给出将外部数据类型作为参数传入udf中的样例,所以比较困惑
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-09-04 16:02:56,"Xingbo Huang" <[email protected]> 写道:
>> >Hi,
>> >
>> >推荐你使用ddl来声明你上下游用的connector
>> >
>> >```
>> >table_env.execute_sql("""
>> >CREATE TABLE output (
>> >data STRING ARRAY
>> >) WITH (
>> > 'connector' = 'filesystem',           -- required: specify the connector
>> > 'path' = 'file:///tmp/test.csv',  -- required: path to a directory
>> > 'format' = 'json',
>> > 'json.fail-on-missing-field' = 'false',
>> > 'json.ignore-parse-errors' = 'true'
>> >)
>> >""")
>> >
>>
>> >table.select("func(b)").execute_insert("output").get_job_client().get_job_execution_result().result()
>> >```
>> >
>> >Best,
>> >Xingbo
>> >
>> >
>> >
>> >whh_960101 <[email protected]> 于2020年9月4日周五 下午3:46写道:
>> >
>> >> 您好,我是想让输出insert_into到目标表中,具体如下:
>> >> st_env=StreamExecutionEnvironment.get_execution_environment()
>> >> st_env.connect了一个source table(table包含a字段),
>> >> 然后
>> >> | st_env.connect(FileSystem().path('tmp')) \ |
>> >> | | .with_format(OldCsv() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> >> | | .with_schema(Schema() |
>> >> | | .field('res', DataTypes.ARRAY(DataTypes.STRING()))) \ |
>> >> | | .create_temporary_table('sink') |
>> >> connect了一个sink表,format、schema都是DataTypes.ARRAY()
>> >> 然后我定义了一个udf
>> >>
>> >>
>> @udf(input_types=DataTypes.STRING(),result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> def func(a):
>> >>     rec_list = a.split(',')
>> >>     res_arr = np.arrary(rec_list,dtype=str)
>> >>     return res_arr
>> >> st_env.register_function("func", func)
>> >> st_env.from_path("source").select("func(a)").insert_into("sink")
>> >> 最后得到的tmp文件里面就是[Ljava.lang.String;@f795710这种形式的字符串
>> ,不是我res_arr里面的内容,如果我单独返回一个值,比如return
>> >> res_arr[0],tmp文件里面的字符串就是正确。
>> >> 我想要得到array,该怎么解决?
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-09-04 15:17:38,"Xingbo Huang" <[email protected]> 写道:
>> >> >Hi,
>> >> >
>> >> >你是调试的时候想看结果吗?
>> >> >你可以直接table.to_pandas()来看结果,或者用print connector来看。
>> >> >
>> >> >个人觉得to_pandas最简单,比如你可以试试下面的例子
>> >> >
>> >> >```
>> >> >table=table_env.from_elements([(1, 'Hi'), (2, 'Hehe')], ['a', 'b'])
>> >> >
>> >> >@udf(input_types=DataTypes.STRING(),
>> >> >result_type=DataTypes.ARRAY(DataTypes.STRING()))
>> >> >def func(a):
>> >> >     return np.array([a, a, a], dtype=str)
>> >> >
>> >> >table_env.register_function("func", func)
>> >> >
>> >> >table.select("func(b)").to_pandas()
>> >> >```
>> >> >然后,你可以看看官方文档[1],让你快速上手PyFlink
>> >> >
>> >> >Best,
>> >> >Xingbo
>> >> >
>> >> >[1]
>> >> >
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/intro_to_table_api.html
>> >> >
>> >> >whh_960101 <[email protected]> 于2020年9月4日周五 下午2:50写道:
>> >> >
>> >> >> 您好,numpy的array和DataTypes.ARRAY()的类型格式是一样的吗
>> >> >> 我的udf输出了一个numpy.array(dtype = str),
>> >> >> result_type设的是DataTypes.ARRAY(DataTypes.STRING())
>> >> >>
>> >> >>
>> >>
>> 把输出insert_into到sink环境中,with_format设为OldCsv().field('res',DataTypes.ARRAY(DataTypes.STRING()))
>> >> >> 打印出来的结果是[Ljava.lang.String;@f795710这种类型的字符串,不是我np.array里的内容
>> >> >> 请问这个问题该怎么解决?
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2020-09-04 10:35:03,"Xingbo Huang" <[email protected]> 写道:
>> >> >> >Hi,
>> >> >> >
>> >> >>
>> >> >>
>> >>
>> >你这个场景,你其实可以直接拿到after里面的字段b的,用法是table.select("after.get('b')")。你都不需要使用udf来做这件事
>> >> >> >
>> >> >> >Best,
>> >> >> >Xingbo
>> >> >> >
>> >> >> >whh_960101 <[email protected]> 于2020年9月4日周五 上午9:26写道:
>> >> >> >
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> 您好,我的问题是:首先我有一个source的环境,with_format是嵌套的json,用两层DataTypes.Row()来定义,其中有一个字段的格式是DataTypes.Row([DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())])(字段名为after),现在我要定义一个udf,输入是after,我想从中获取b字段的值
>> >> >> >> udf定义如下:
>> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>> @udf(input_types=[DataTypes.STRING(),DataTypes.STRING()],result_type=DataTypes.STRING())
>> >> >> >> def fun(data):
>> >> >> >>      b= data['b'] #我想获取after中XMLRECORD字段的值,这样写会报错
>> >> >> >>
>> >> >> >>
>> >> >> >> 如果通过table.select("after.b")或者table.select('after').select('b')也会报错
>> >> >> >> 希望您能给我提供好的解决办法,万分感谢!
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >> 在 2020-09-03 22:23:28,"Xingbo Huang" <[email protected]> 写道:
>> >> >> >> >Hi,
>> >> >> >> >
>> >> >> >> >我觉得你从头详细描述一下你的表结构。
>> >> >> >> >比如,我的表包含三个字段a,b,c,他们的类型都是varchar的,
>> >> >> >>
>> >> >然后我定义了一个udf,他的输入是我的表的b字段。那么这个时候我的udf的input_types就是DataTypes.STRING()就行。
>> >> >> >> >使用就是table.select("udf(b)")。详细的你可以去查阅文档[1]
>> >> >> >> >
>> >> >> >> >[1]
>> >> >> >> >
>> >> >> >>
>> >> >>
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/udfs/python_udfs.html#scalar-functions
>> >> >> >> >
>> >> >> >> >Best,
>> >> >> >> >Xingbo
>> >> >> >> >
>> >> >> >> ><[email protected]> 于2020年9月3日周四 下午9:45写道:
>> >> >> >> >
>> >> >> >> >>
>> >> 我的输入是Row类型的(包含字段名a,b,c),比如我的输入是W,我的函数只需要用到其中的第二列(b),input_types按照您的写法
>> >> >> >> >> input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> >> DataTypes.STRING()]
>> >> >> >> >>
>> >> >> >> >> 在def的时候应该是写def函数名(W):,然后函数内部取第二列写W['b']可以吗
>> >> >> >> >> 或者正确写法是什么样的,感谢解答!
>> >> >> >> >>
>> >> >> >> >>
>> >> >> >> >> | |
>> >> >> >> >> whh_960101
>> >> >> >> >> |
>> >> >> >> >> |
>> >> >> >> >> 邮箱:[email protected]
>> >> >> >> >> |
>> >> >> >> >>
>> >> >> >> >> 签名由 网易邮箱大师 定制
>> >> >> >> >>
>> >> >> >> >> 在2020年09月03日 21:14,Xingbo Huang 写道:
>> >> >> >> >> Hi,
>> >> >> >> >> input_types定义的是每一个列的具体类型。
>> >> >> >> >> 比如udf输入的是三列,类型都是DataTypes.STRING(),这个时候你
>> >> >> >> >> 正确的写法是
>> >> >> >> >>
>> >> >> >> >>    input_types=[DataTypes.STRING(), DataTypes.STRING(),
>> >> >> >> DataTypes.STRING()]
>> >> >> >> >>
>> >> >> >> >> 针对这个例子,你错误的写法是(根据报错,我猜测你是这么写的)
>> >> >> >> >>    input_types=DataTypes.Row([DataTypes.FIELD("a",
>> >> >> DataTypes.STRING()),
>> >> >> >> >> DataTypes.FIELD("b", DataTypes.STRING()), DataTypes.FIELD("c",
>> >> >> >> >> DataTypes.STRING())])
>> >> >> >> >>
>> >> >> >> >> Best,
>> >> >> >> >> Xingbo
>> >> >> >> >>
>> >> >> >> >> whh_960101 <[email protected]> 于2020年9月3日周四 下午9:03写道:
>> >> >> >> >>
>> >> >> >> >> >
>> 您好,请问我在定义udf的时候input_types为什么不能使用DataTypes.Row(),使用后会报错:Invalid
>> >> >> >> >> > input_type:input_type should be DataType but contain
>> >> >> RowField(RECID,
>> >> >> >> >> > VARCHAR)
>> >> >> >> >> > 我的pyflink版本:1.11.1
>> >> >> >> >>
>> >> >> >>
>> >> >>
>> >>
>>

回复