________________________________
发件人: faronzz <faro...@163.com>
发送时间: 2023年8月26日 22:12
收件人: user-zh@flink.apache.org <user-zh@flink.apache.org>
主题: pyflink aggfunction in window tvf can not sink connection='kafka', it 
notice consuming update changesm, java aggfunction and aggfunction in flink, 
such as sum is ok

hi~
       I came across a problem I didn't understand,I can't use pyflink 
aggfuction function properly in window tvf, The following are available:
java aggfuntion

flink system aggfunction
window (not window tvf)
I want to know if this is a bug or if I'm using it the wrong way?

pyflink  1.17.1
flink 1.17.1




from datetime import datetime, timedelta


from pyflink.table import AggregateFunction
from pyflink.common.typeinfo import Types
from pyflink.common import Row

from pyflink.table import Schema, StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment


class Sum0(AggregateFunction):

   def get_value(self, accumulator):
       return accumulator[0]

   def create_accumulator(self):
       return Row(0)

   def accumulate(self, accumulator, *args):
       if args[0] is not None:
           accumulator[0] += args[0]

   def retract(self, accumulator, *args):
       if args[0] is not None:
           accumulator[0] -= args[0]

   def merge(self, accumulator, accumulators):
       for acc in accumulators:
           accumulator[0] += acc[0]

   def get_result_type(self):
       return "BIGINT"

   def get_accumulator_type(self):
       return 'ROW'


def test_py_udf_kafka():
   # 流模式
   env = StreamExecutionEnvironment.get_execution_environment()
   
env.add_jars("file:///Users/faron/Downloads/flink-sql-connector-kafka-1.17.1.jar")
   env.set_parallelism(1)
   table_env = StreamTableEnvironment.create(stream_execution_environment=env)
   ds = env.from_collection(
       collection=[(1, 2, "Lee", datetime.now() - timedelta(hours=4)),
                   (2, 3, "Lee", datetime.now() - timedelta(hours=4)),
                   (3, 4, "Jay", datetime.now() - timedelta(hours=4)),
                   (5, 6, "Jay", datetime.now() - timedelta(hours=2)),
                   (7, 8, "Lee", datetime.now())],
       type_info=Types.ROW([Types.INT(),
                           Types.INT(),
                           Types.STRING(),
                           Types.SQL_TIMESTAMP()]))

   table_schema = Schema.new_builder() \
       .column("f0", "INT") \
       .column("f1", "INT") \
       .column("f2", "STRING") \
       .column_by_expression("rowtime", "CAST(f3 AS TIMESTAMP(3))") \
       .watermark("rowtime", "rowtime - INTERVAL '1' SECOND") \
       .build()

   ts = table_env.from_data_stream(ds, table_schema) \
       .alias("value", "count", "name", "rowtime")

   print("打印源表结构")
   ts.print_schema()

   sql_sink_dll_1 = """CREATE TABLE kafka_test(
       `name` string, `agg_data` bigint)
       with (
               'connector' = 'kafka',
               'topic'='test_java2',
               'properties.bootstrap.servers'='agent3:9092',
               'value.format' = 'json'
           );"""

   table_env.execute_sql(sql_sink_dll_1)
   table_env.create_temporary_view("source", ts)
   table_env.create_temporary_function(
       "sum_udf",
       Sum0())
   sql_query_system = """
       select name,sum(`value`) as agg_data from
       TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
       group by window_start, window_end, name
       """
   sql_query = """
       select name,sum_udf(`value`) as agg_data from
       TABLE(TUMBLE(TABLE source, DESCRIPTOR(rowtime),INTERVAL '1' HOURS))
       group by window_start, window_end, name
       """
   print(table_env.explain_sql(sql_query))
   table_env.sql_query(sql_query).execute().print()
   table_env.sql_query(sql_query).execute_insert("kafka_test").wait()


if __name__ == "__main__":
   test_py_udf_kafka()



| |
faronzz
|
|
faro...@163.com


|


| |
faronzz
|
|
faro...@163.com
|

回复