我试了一下是可以运行的,可以发一下报错吗?
On Mon, Oct 18, 2021 at 6:44 PM xuzh <[email protected]> wrote:
> from pyflink.table import ScalarFunction, EnvironmentSettings,
> TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def __init__(self):
> self.factor = 12
>
> def eval(self, s):
> return hash(s) * self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # 在 SQL API 中使用 Python 自定义函数
> btenv.create_temporary_function("hash_code", udf(HashCode(),
> result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
> DataTypes.ROW([DataTypes.FIELD("a",
> DataTypes.INT()),
> DataTypes.FIELD("b",
> DataTypes.STRING()),
> DataTypes.FIELD("c",
> DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. 创建 sink 表
> btenv.execute_sql("""
> CREATE TABLE rs (
> a int,
> b string,
> m bigint
> ) WITH (
> 'connector' = 'print'
> )
> """)
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>