我试了一下是可以运行的,可以发一下报错吗?

On Mon, Oct 18, 2021 at 6:44 PM xuzh <[email protected]> wrote:

> from pyflink.table import ScalarFunction, EnvironmentSettings, 
> TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
>     def __init__(self):
>         self.factor = 12
>
>     def eval(self, s):
>         return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(), 
> result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
>                           DataTypes.ROW([DataTypes.FIELD("a", 
> DataTypes.INT()),
>                                          DataTypes.FIELD("b", 
> DataTypes.STRING()),
>                                          DataTypes.FIELD("c", 
> DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
>        CREATE TABLE rs (
>            a int,
>            b string,
>            m bigint
>        ) WITH (
>            'connector' = 'print'
>        )
>    """)
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>

回复