Hi casel.chen, 我理解你的意思是: 希望在ThirdPartyPaymentStream一条数据达到的30分钟后,*再触发查询* ,如果此时该数据在PlatformPaymentStream中还未出现,说明超时未支付,则输入到下游。而不是等ThirdPartyPaymentStream数据达到时再判断是否超时,因为此时虽然超时达到,但是也算已支付,没必要再触发报警了。
如果是流计算,可以采用timer定时器延时触发。 对于sql, 我个人的一个比较绕的想法是(供参考,不一定对):是通过Pulsar Sink(或RocketMQ等有延迟队列的消息中间件)将PlatformPaymentStream的数据写入延迟队列(30分钟)[1], 然后延迟消费为PlatformPaymentStream2。然后将PlatformPaymentStream2 *left join* ThirdPartyPaymentStream, 如果join后的结果不包含ThirdPartyPaymentStream部分,说明没有及时付款。 [1] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/pulsar/#%e6%b6%88%e6%81%af%e5%bb%b6%e6%97%b6%e5%8f%91%e9%80%81 Best Hongshun On Wed, May 10, 2023 at 8:45 AM Shammon FY <zjur...@gmail.com> wrote: > Hi > > 如果使用CEP,可以将两个流合并成一个流,然后通过subtype根据不同的事件类型来匹配,定义CEP的Pattern,例如以下这种 > DataStream<E1> s1 = ...; > DataStream<E2> s2 = ...; > DataStream<E> s = s1.union(s1)...; > Pattern<Event, ?> = Pattern.begin("first") > .subtype(E1.class) > .where(...) > .followedBy("second") > .subtype(E2.class) > .where(...) > > 如果使用Flink SQL,可以直接使用双流Join+窗口实现 > > Best, > Shammon FY > > > > > On Wed, May 10, 2023 at 2:24 AM casel.chen <casel_c...@126.com> wrote: > > > 需求:业务端实现支付功能,需要通过第三方支付平台的交易数据采用Flink > > SQL来做一个实时对账,对于超过30分钟内未到达的第三方支付平台交易数据进行告警。 > > 请问这个双流实时对帐场景使用Flink CEP SQL要如何实现? > > > > > 网上找的例子都是基于单条流实现的,而上述场景会用到两条流,一个是PlatformPaymentStream,另一个是ThirdPartyPaymentStream。 >