from pyflink.table import ScalarFunction, EnvironmentSettings,
TableEnvironment, DataTypes
from pyflink.table.udf import udf
from pyflink.table.expressions import call, row
class HashCode(ScalarFunction):
def __init__(self):
self.factor = 12
def eval(self, s):
return hash(s) * self.factor
env_settings = EnvironmentSettings.in_batch_mode()
btenv = TableEnvironment.create(env_settings)
hash_code = udf(HashCode(), result_type=DataTypes.BIGINT())
# ?? SQL API ?????? Python ??????????
btenv.create_temporary_function("hash_code", udf(HashCode(),
result_type=DataTypes.BIGINT()))
tb2 = btenv.from_elements([row(1, 'abc', 2.0), row(2, 'def', 3.0)],
DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
DataTypes.FIELD("b",
DataTypes.STRING()),
DataTypes.FIELD("c",
DataTypes.FLOAT())]))
btenv.create_temporary_view("tb2", tb2)
tb2 = btenv.sql_query("SELECT a,b,hash_code(a) m FROM tb2")
print(tb2.to_pandas())
# 3. ???? sink ??
btenv.execute_sql("""
CREATE TABLE rs (
a int,
b string,
m bigint
) WITH (
'connector' = 'print'
)
""")
tb2.execute_insert("rs").wait()
print(tb2.to_pandas())
#