你好,可以先看看官方文档中关于事件时间和水印的介绍 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/concepts/time/ 如果你发了多条数据,但是都是同样数据的话,水印没有推进,窗口就不会触发
在 2023-05-25 10:00:36,"小昌同学" <ccc0606fight...@163.com> 写道: >是的 我发送了很多数据,发现窗口还是没有触发 > > >| | >小昌同学 >| >| >ccc0606fight...@163.com >| >---- 回复的原邮件 ---- >| 发件人 | yidan zhao<hinobl...@gmail.com> | >| 发送日期 | 2023年5月25日 09:59 | >| 收件人 | <user-zh@flink.apache.org> | >| 主题 | Re: flink 窗口触发计算的条件 | >如果你只发送了一条数据,那么watermark不会推进,就不会触发窗口计算。你需要更多数据。 > >小昌同学 <ccc0606fight...@163.com> 于2023年5月25日周四 09:32写道: > >各位老师,请教一下关于flink 事件时间窗口的执行时间点的相关问题; >我使用的窗口是:TumblingEventTimeWindows(Time.minutes(1L)),我使用的时间定义是System.currentTimeMillis(),watermark是2秒, >但是当我发送一条数据后,过了5分钟之后,窗口都没有触发计算,想请各位老师帮忙看一下程序的问题所在: >相关代码以及样例数据如下: >| >package job; >import bean.MidInfo3; >import bean.Result; >import bean2.BaseInfo2; >import com.alibaba.fastjson.JSON; >import com.alibaba.fastjson.JSONObject; >import config.FlinkConfig; >import org.apache.flink.api.common.eventtime.WatermarkStrategy; >import org.apache.flink.api.common.functions.FilterFunction; >import org.apache.flink.api.common.functions.JoinFunction; >import org.apache.flink.api.common.functions.MapFunction; >import org.apache.flink.api.common.serialization.SimpleStringSchema; >import org.apache.flink.api.common.state.StateTtlConfig; >import org.apache.flink.api.common.state.ValueState; >import org.apache.flink.api.common.state.ValueStateDescriptor; >import org.apache.flink.api.java.functions.KeySelector; >import org.apache.flink.configuration.Configuration; >import org.apache.flink.streaming.api.datastream.ConnectedStreams; >import org.apache.flink.streaming.api.datastream.DataStream; >import org.apache.flink.streaming.api.datastream.DataStreamSource; >import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; >import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >import org.apache.flink.streaming.api.functions.ProcessFunction; >import org.apache.flink.streaming.api.functions.co.CoMapFunction; >import >org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; >import >org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; >import >org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; >import org.apache.flink.streaming.api.windowing.time.Time; >import >org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; >import org.apache.flink.streaming.api.windowing.windows.TimeWindow; >import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; >import org.apache.flink.util.Collector; >import utils.DateUtil; >import utils.JdbcUtil; > >import java.sql.Connection; >import java.sql.PreparedStatement; >import java.sql.ResultSet; >import java.time.Duration; >import java.util.HashMap; >import java.util.Properties; > >public class RytLogAnly9 { >public static void main(String[] args) throws Exception { >StreamExecutionEnvironment env = >StreamExecutionEnvironment.getExecutionEnvironment(); >env.disableOperatorChaining(); >//1、消费Kafka中的数据 >String servers = FlinkConfig.config.getProperty("dev_bootstrap.servers"); >String topicName = FlinkConfig.config.getProperty("dev_topicName"); >String groupId = FlinkConfig.config.getProperty("dev_groupId"); >String devMode = FlinkConfig.config.getProperty("dev_mode"); >Properties prop = new Properties(); >prop.setProperty("bootstrap.servers", servers); >prop.setProperty("group.id", groupId); >prop.setProperty("auto.offset.reset", devMode); >DataStreamSource<String> sourceStream = env.addSource(new >FlinkKafkaConsumer<String>(topicName, new SimpleStringSchema(), prop)); >sourceStream.print("最源端的数据sourceStream"); > >//2、对原始数据进行处理,获取到自己需要的数据,生成BaseInfo2基类数据 >SingleOutputStreamOperator<BaseInfo2> baseInfoStream = sourceStream.map(new >MapFunction<String, BaseInfo2>() { >@Override >public BaseInfo2 map(String value) throws Exception { >JSONObject jsonObject = JSON.parseObject(value); >//获取到不同的服务器IP >String serverIp = jsonObject.getString("ip"); >//获取到不同的data的数据 >String datas = jsonObject.getString("data"); > >String[] splits = datas.split("\n"); >HashMap<String, String> dataMap = new HashMap<>(); >//将time填充到自定义类型中,用来判断同一个num的请求以及应答时间 >String time = splits[0].substring(7, 19).replace("-", "").trim(); >//将subData填充到自定义类型中,用来判断时请求还是应答 >String subData = datas.substring(0, 10); >for (int i = 0; i < splits.length; i++) { >if (splits[i].contains("=")) { >splits[i] = splits[i].replaceFirst("=", "&"); >String[] temp = splits[i].split("&"); >if (temp.length > 1) { >dataMap.put(temp[0].toLowerCase(), temp[1]); >} >} >} >return new BaseInfo2(dataMap.get("action"), "需要去MySQL中查找对应的功能描述", serverIp, >DateUtil.string2Long(time), dataMap.get("handleserialno"), subData, >System.currentTimeMillis()); >} >}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, > recordTimestamp) -> element.getEvenTime())); >baseInfoStream.print("不加功能描述的 baseInfoStream"); > >//3、上述的数据流中的action仅仅是数字,需要关联一下MySQL去拿到对应的功能中文描述 >SingleOutputStreamOperator<BaseInfo2> completeInfoStream = >baseInfoStream.map(new MapFunction<BaseInfo2, BaseInfo2>() { >@Override >public BaseInfo2 map(BaseInfo2 value) throws Exception { >//拿到数据中携带的数字的action >String actionId = value.getFuncId(); >System.out.println("数据中的action编码是: " + actionId); >String actionName = null; >Connection connection = null; >PreparedStatement ps = null; > >//根据数据的action去MySQL中查找到对应的中午注释 >try { >String sql = "select action_name from ActionType where action = ?"; >connection = JdbcUtil.getConnection(); >ps = connection.prepareStatement(sql); >ps.setString(1, actionId); >ResultSet resultSet = ps.executeQuery(); >System.out.println("resultSet是" + resultSet); >if (resultSet.next()) { >actionName = resultSet.getString("action_name"); >} >} catch (Exception e) { >throw new RuntimeException(e); >} finally { >JdbcUtil.closeResource(connection, ps); >} >return new BaseInfo2(value.getFuncId(), actionName, value.getServerIp(), >value.getBaseTime(), value.getHandleSerialNo(), value.getInfo(), >value.getEvenTime()); >} >}).assignTimestampsAndWatermarks(WatermarkStrategy.<BaseInfo2>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner((element, > recordTimestamp) -> element.getEvenTime())); >completeInfoStream.print("加上中文描述的 completeInfoStream"); > >SingleOutputStreamOperator<BaseInfo2> requestDataStream = >completeInfoStream.filter(new FilterFunction<BaseInfo2>() { >@Override >public boolean filter(BaseInfo2 baseInfo2) throws Exception { >return baseInfo2.getInfo().contains("请求"); >} >}); >SingleOutputStreamOperator<BaseInfo2> answerDataStream = >completeInfoStream.filter(new FilterFunction<BaseInfo2>() { >@Override >public boolean filter(BaseInfo2 baseInfo2) throws Exception { >return baseInfo2.getInfo().contains("应答"); >} >}); >requestDataStream.print("请求流是 requestDataStream"); >answerDataStream.print("应答流是 answerDataStream"); > >DataStream<MidInfo3> joinStream = requestDataStream.join(answerDataStream) >.where(BaseInfo2::getHandleSerialNo) >.equalTo(BaseInfo2::getHandleSerialNo) >.window(TumblingEventTimeWindows.of(Time.minutes(1L))) >.apply(new JoinFunction<BaseInfo2, BaseInfo2, MidInfo3>() { >@Override >public MidInfo3 join(BaseInfo2 first, BaseInfo2 second) throws Exception { >System.out.println("以关联:" + first.getFuncId() + second.getEvenTime()); >System.out.println("关联:" + first.getEvenTime() +"|" >+second.getEvenTime()+"执行时间:"+System.currentTimeMillis()); >return new MidInfo3(first.getFuncId(), first.getFuncIdDesc(), >first.getServerIp(), first.getBaseTime(), second.getBaseTime(), >first.getFuncId() + first.getServerIp(), first.getEvenTime()); >} >}); >joinStream.print("joinStream:"); >System.out.println("joinTime:"+System.currentTimeMillis()); >joinStream.keyBy(new KeySelector<MidInfo3, String>() { >@Override >public String getKey(MidInfo3 value) throws Exception { >return value.getFuncId() + value.getServerIp(); >} >}).process(new ProcessFunction<MidInfo3, Result>() { >private ValueState<Long> timeState; > >@Override >public void open(Configuration parameters) throws Exception { >System.out.println("加载的是process中的open方法"+System.currentTimeMillis()); >ValueStateDescriptor<Long> timeStateDescriptor = new >ValueStateDescriptor<>("timeState", Long.class); >// 过期状态清除 >StateTtlConfig stateTtlConfig = StateTtlConfig >.newBuilder(org.apache.flink.api.common.time.Time.days(1)) >.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) >.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) >.build(); >// 开启ttl >timeStateDescriptor.enableTimeToLive(stateTtlConfig); > >this.timeState = getRuntimeContext().getState(timeStateDescriptor); > >} > >@Override >public void processElement(MidInfo3 value, ProcessFunction<MidInfo3, >Result>.Context ctx, Collector<Result> out) throws Exception { >//获取到当前的状态值 >if (null==timeState.value()){ >timeState.update(value.getAnswerTime()-value.getRequesTime()); >}else { >if ((value.getAnswerTime() - value.getRequesTime()) < timeState.value()) { >timeState.update((value.getAnswerTime() - value.getRequesTime())); >} >} > >out.collect(new Result(value.getFuncId(), value.getFuncIdDesc(), >value.getServerIp(), timeState.value())); > >} >}).print("结果是"); > >env.execute(); >} >} > > > >相关的数据样例如下; >{"ip":"10.125.8.20230525_0856","data":"请求: -- 14:28:05.111 -- ><44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"} >{"ip":"10.125.8.20230525_0856","data":"应答: -- 14:28:05.111 -- ><44.15050>1D971BEEF138370\nAction=100\nMobileCode=13304431188\nReqno=380\niPhoneKey=1681799375200\nCFrom=dbzq.android\nTFrom=newandroid\nGateWayIp=124.234.116.150\nHandleSerialNo=20230525_0856lmuqbAABLOgVTU/3lQOcAAAClAAAABQAAAP9ZAACQHAAAAAAAAAAAAACQHAAAdAAAAGJIZDhiSzVUQUFBVWVOaFNVLzNsUU5ZQUFBREhEd0FBQXdBQ0FBa0FBQUNRSEFBQUFBQUFBQUFBQUFDUUhBQUFJZ0FBQUFGSUFBQUFBQUZTQXdBQUFETTRNQUZKRFFBQUFERTJPREUzT1Rrek56VXlNREFBAA==\nGateWayPort=41912\nclientversion=1.01.110\ntztreqfrom=android.webview\nReqlinkType=2\nnewindex=1\nReqTag=96756351=9=2=0.2.134739166=1681799375201\ntztsno=b8e947dc8498edfb9c7605f290fc13ba\npartenerName=zzinfo\nuniqueid=1C0FF05B-D047-45B4-8212-6AD8627DBA4F\nEmptyFields=Token&\ntztSDKType=0\n"} >|