??????????????????????udf????????????????????????????????????????udf????????????????????????????????????jar??
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2021??10??19??(??????) ????10:51
??????: "user-zh"<[email protected]>;"xuzh"<[email protected]>;
????: Re: pyflink 1.14.0 udf ??????????????????????????
??????????????????????????????????????????
On Mon, Oct 18, 2021 at 6:44 PM xuzh <[email protected]> wrote:
> from pyflink.table import ScalarFunction, EnvironmentSettings,
TableEnvironment, DataTypes
> from pyflink.table.udf import udf
> from pyflink.table.expressions import call, row
>
>
> class HashCode(ScalarFunction):
> def __init__(self):
> self.factor = 12
>
> def eval(self, s):
> return hash(s) *
self.factor
>
>
> env_settings = EnvironmentSettings.in_batch_mode()
> btenv = TableEnvironment.create(env_settings)
>
> hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
> # ?? SQL API ?????? Python ??????????
> btenv.create_temporary_function("hash_code", udf(HashCode(),
result_type=DataTypes.BIGINT()))
> tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
>
DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
>
DataTypes.FIELD("b", DataTypes.STRING()),
>
DataTypes.FIELD("c", DataTypes.FLOAT())]))
> btenv.create_temporary_view("tb2", tb2)
> tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
> print(tb2.to_pandas())
>
> # 3. ???? sink ??
> btenv.execute_sql("""
> CREATE TABLE rs (
> a int,
> b string,
> m bigint
> ) WITH (
>
'connector' = 'print'
> )
> """)
>
> tb2.execute_insert("rs").wait()
> print(tb2.to_pandas())
> #
>
>
>
>
>