from pyflink.table import ScalarFunction, EnvironmentSettings, 
TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import call, row


class HashCode(ScalarFunction):
    def __init__(self):
        self.factor = 12

    def eval(self, s):
        return hash(s) * self.factor


env_settings = EnvironmentSettings.in_batch_mode()
btenv = TableEnvironment.create(env_settings)

hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# ?? SQL API ?????? Python ??????????
btenv.create_temporary_function("hash_code", udf(HashCode(), 
result_type=DataTypes.BIGINT()))
tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
                          DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
                                         DataTypes.FIELD("b", 
DataTypes.STRING()),
                                         DataTypes.FIELD("c", 
DataTypes.FLOAT())]))
btenv.create_temporary_view("tb2", tb2)
tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
print(tb2.to_pandas())

# 3. ???? sink ??
btenv.execute_sql("""
       CREATE TABLE rs (
           a int,
           b string,
           m bigint
       ) WITH (
           'connector' = 'print'
       )
   """)

tb2.execute_insert("rs").wait()
print(tb2.to_pandas())
#

回复