hi,all:
一账号一段时间内连续操作为一个pv,间隔时间超过阈值后会记为新的pv。系统需要获取流式日志,使用日志统计实时数据的各项指标。但是我们在使用session
window的时候无法使用udaf(自定义聚合函数)对相同pv日志进行聚合统计。
希望知道的大佬能给点建议。感谢!!!

session_window = Session.with_gap("60.second").on("pv_time").alias("w")
t_env.from_path('source') \
    .window(session_window) \
    .group_by("w,pv_id") \
    .select("pv_id,get_act(act)").insert_into("sink")

<http://apache-flink.147419.n8.nabble.com/file/t1355/infoflow_2021-03-08_19-02-16.png>
 



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复