(无主题)
退订
Re: Flink k8s HA 手动删除作业deployment导致的异常
Zhanghao的回答已经非常全面了,我再补充小点,删除Deployment保留HA ConfigMap是预期内的行为,文档里面有说明[1] 之所以这样设计有两点原因: (1.) 任务可能会被重启,但使用相同的cluster-id,并且希望从之前的checkpoint恢复 (2.) 单纯的删除ConfigMap会导致存储在DFS(e.g. HDFS、S3、OSS)上面的HA数据泄露 [1]. https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/ha/kubernetes_ha/#high-availability-data-clean-up Best, Yang Zhanghao Chen 于2022年6月13日周一 07:53写道: > 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink > k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 > > 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 > ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。 > > 2.基于k8s做HA的Flink job id皆为。 > > 开启 HA 的 Application mode 的 Flink job id > 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA > 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 > JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 > job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp > 相关联,导致作业从全新状态恢复。 > > 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 > > 可以看下官方的博客文章: > https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 > JIRA 设计文档: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink > > > Best, > Zhanghao Chen > > From: m18814122325 > Sent: Sunday, June 12, 2022 22:45 > To: user-zh@flink.apache.org > Subject: Flink k8s HA 手动删除作业deployment导致的异常 > > Flink version: 1.15.0 > > deploy mode: Native k8s application > > > > > 问题现象: > > 我以Native > k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s > 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 > > > > > kubectl delete deployment flink-bdra-sql-application-job -n > bdra-dev-flink-standalone > > > > > kubectl get configMap -n bdra-dev-flink-standalone > > > > > NAME >DATA AGE > > flink-bdra-sql-application-job--config-map > 2 13m > > flink-bdra-sql-application-job-cluster-config-map > 1 13m > > > > > > > > 我有以下疑问: > > 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink > k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 > > 2.基于k8s做HA的Flink job id皆为。 > > 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 > > > > > 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) > > flink run-application --target kubernetes-application -c CalculateUv > -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p > -Dkubernetes.container.image= > acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 > -Dkubernetes.namespace=bdra-dev-flink-standalone > -Dkubernetes.service-account=bdra-dev-flink-standalone-sa > -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 > -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 > -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m > -Dstate.backend=filesystem > -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 > -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 > -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory > -Dhigh-availability.storageDir=file:///opt/flink/log/recovery > -Ds3.access-key=* -Ds3.secret-key=* > -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory > -Dmetrics.reporter.influxdb.scheme=http > -Dmetrics.reporter.influxdb.host=influxdb > -Dmetrics.reporter.influxdb.port=8086 > -Dmetrics.reporter.influxdb.db=flink_metrics > -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 > -Dkubernetes.rest-service.exposed.type=ClusterIP > -Dkubernetes.config.file=kube_config > -Dkubernetes.pod-template-file=pod-template.yaml > local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar > > > > > 重启后自动从ConfigMap中恢复。 > > 2022-06-10 20:20:52,592 INFO > org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess > [] - Successfully recovered 1 persisted job graphs. > > 2022-06-10 20:20:52,654 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting > RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher > at akka://flink/user/rpc/dispatcher_1 . > > 2022-06-10 20:20:53,552 INFO > org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered > 0 pods from previous attempts, current attempt id is 1. > > 2022-06-10 20:20:53,552 INFO > org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - > Recovered 0 workers from previous attempt. > > 2022-06-10 20:20:55,352 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Starting DefaultLeaderElectionService with >
Re: Flink k8s HA 手动删除作业deployment导致的异常
1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 是的,Flink K8s HA 是基于 ConfigMap 开发的,并且 HA configmap 没有设置 ownerreference,因此如果想在保留 HA 数据的情况下重启集群直接 delete deployment 就行,重启后会从最新 cp 恢复。 2.基于k8s做HA的Flink job id皆为。 开启 HA 的 Application mode 的 Flink job id 皆为,与是否使用 K8s HA 无关。job id 是作业的唯一标识符,HA 服务使用它来命名和寻址和单个作业有关的 HA 资源(如保存的 jobgraph 和 cp)。Application mode 下 jobgraph 在 JM 生成,不开启 HA 时每次生成 jobgraph 会随机生成一个 job id 作为 job 的 唯一标识符,开启 HA 时需要使用一个固定的 job id (一串 0 的 jobid 就是这么来的),否则 JM failover 后重新生成了一个新的不同的 job id,无法和之前的 cp 相关联,导致作业从全新状态恢复。 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 可以看下官方的博客文章: https://flink.apache.org/2021/02/10/native-k8s-with-ha.html,更多细节可以参阅 JIRA 设计文档:https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink Best, Zhanghao Chen From: m18814122325 Sent: Sunday, June 12, 2022 22:45 To: user-zh@flink.apache.org Subject: Flink k8s HA 手动删除作业deployment导致的异常 Flink version: 1.15.0 deploy mode: Native k8s application 问题现象: 我以Native k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 kubectl delete deployment flink-bdra-sql-application-job -n bdra-dev-flink-standalone kubectl get configMap -n bdra-dev-flink-standalone NAME DATA AGE flink-bdra-sql-application-job--config-map 2 13m flink-bdra-sql-application-job-cluster-config-map 1 13m 我有以下疑问: 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 2.基于k8s做HA的Flink job id皆为。 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) flink run-application --target kubernetes-application -c CalculateUv -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p -Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 -Dkubernetes.namespace=bdra-dev-flink-standalone -Dkubernetes.service-account=bdra-dev-flink-standalone-sa -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m -Dstate.backend=filesystem -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=file:///opt/flink/log/recovery -Ds3.access-key=* -Ds3.secret-key=* -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory -Dmetrics.reporter.influxdb.scheme=http -Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 -Dmetrics.reporter.influxdb.db=flink_metrics -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 -Dkubernetes.rest-service.exposed.type=ClusterIP -Dkubernetes.config.file=kube_config -Dkubernetes.pod-template-file=pod-template.yaml local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar 重启后自动从ConfigMap中恢复。 2022-06-10 20:20:52,592 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 1 persisted job graphs. 2022-06-10 20:20:52,654 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2022-06-10 20:20:53,552 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1. 2022-06-10 20:20:53,552 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. 2022-06-10 20:20:55,352 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5. 2022-06-10 20:20:55,370 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: false) 2022-06-10 20:20:55,394 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 . 2022-06-10 20:20:55,438 INFO
Flink k8s HA 手动删除作业deployment导致的异常
Flink version: 1.15.0 deploy mode: Native k8s application 问题现象: 我以Native k8s模式部署了一个基于K8S做HA的Flink任务,当我手动删除了作业的deployment后,发现作业做HA的ConfigMap还存在。并且接下来不加参数-s 再次启动作业,从启动日志发现其会从上述ConfigMap记录信息中恢复。 kubectl delete deployment flink-bdra-sql-application-job -n bdra-dev-flink-standalone kubectl get configMap -n bdra-dev-flink-standalone NAME DATA AGE flink-bdra-sql-application-job--config-map 2 13m flink-bdra-sql-application-job-cluster-config-map 1 13m 我有以下疑问: 1.基于K8S做HA的Flink任务要想正常,不能手动删除作业deployment,必须通过cancel,stop命令进行停止。基于上面我猜测Flink k8s HA是基于ConfigMap之上开发的,其声明周期从K8S角度不能像作业的svc一样带ownerreference。 2.基于k8s做HA的Flink job id皆为。 3.Flink k8s HA 是如何工作的,其中存储了什么信息?我想学习其中相关实现,如果大家有其设计文档或相关资料,希望可以回此邮件告诉我,谢谢。 重启命令(不带-s参数,意味着命令本身不带任何从ck或者savepoint恢复) flink run-application --target kubernetes-application -c CalculateUv -Dkubernetes.cluster-id=flink-bdra-sql-application-job-s3p -Dkubernetes.container.image=acpimagehub.cgb.cn/bdra_dev/flink-sql-s3:v0.20 -Dkubernetes.namespace=bdra-dev-flink-standalone -Dkubernetes.service-account=bdra-dev-flink-standalone-sa -Djobmanager.memory.process.size=1024m -Dkubernetes.jobmanager.cpu=2 -Dkubernetes.taskmanager.cpu=2 -Dparallelism.default=8 -Dtaskmanager.numberOfTaskSlots=2 -Dtaskmanager.memory.process.size=2144m -Dstate.backend=filesystem -Dstate.checkpoints.dir=s3p://bdra-user-lun01/flink-checkpoints/flink-bdra-sql-application-job-s3 -Dstate.savepoints.dir=s3a://bdra-user-lun01/flink-savepoints/flink-bdra-sql-application-job-s3 -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory -Dhigh-availability.storageDir=file:///opt/flink/log/recovery -Ds3.access-key=* -Ds3.secret-key=* -Dmetrics.reporter.influxdb.factory.class=org.apache.flink.metrics.influxdb.InfluxdbReporterFactory -Dmetrics.reporter.influxdb.scheme=http -Dmetrics.reporter.influxdb.host=influxdb -Dmetrics.reporter.influxdb.port=8086 -Dmetrics.reporter.influxdb.db=flink_metrics -Dmetrics.reporter.influxdb.consistency=ANY -Ds3.endpoint=http://*:80 -Dkubernetes.rest-service.exposed.type=ClusterIP -Dkubernetes.config.file=kube_config -Dkubernetes.pod-template-file=pod-template.yaml local:///opt/flink/usrlib/flink-sql-1.0-SNAPSHOT.jar 重启后自动从ConfigMap中恢复。 2022-06-10 20:20:52,592 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Successfully recovered 1 persisted job graphs. 2022-06-10 20:20:52,654 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/rpc/dispatcher_1 . 2022-06-10 20:20:53,552 INFO org.apache.flink.kubernetes.KubernetesResourceManagerDriver [] - Recovered 0 pods from previous attempts, current attempt id is 1. 2022-06-10 20:20:53,552 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt. 2022-06-10 20:20:55,352 INFO org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - Starting DefaultLeaderElectionService with org.apache.flink.runtime.leaderelection.MultipleComponentLeaderElectionDriverAdapter@2a1814a5. 2022-06-10 20:20:55,370 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: false) 2022-06-10 20:20:55,394 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcService [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/rpc/jobmanager_2 . 2022-06-10 20:20:55,438 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 'insert-into_default_catalog.default_database.buy_cnt_per_hour' (). 2022-06-10 20:20:55,477 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for insert-into_default_catalog.default_database.buy_cnt_per_hour (). 2022-06-10 20:20:55,558 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}. 2022-06-10 20:20:55,572 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 1 checkpoints in KubernetesStateHandleStore{configMapName='flink-bdra-sql-application-job--config-map'}. 2022-06-10 20:20:55,572 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils
退订
发自我的iPhone
退订
退订
Re: unsubscribe; 退订
你好, 退订请发送任意消息至 user-zh-unsubscr...@flink.apache.org。 Best, Zhanghao Chen From: chenshu...@foxmail.com Sent: Sunday, June 12, 2022 11:44 To: user-zh Subject: unsubscribe; 退订 unsubscribe 退订 chenshu...@foxmail.com
Re: Unaligned Checkpoint
你好, Unaligned checkpoint 是个底层特性,要使用的话只要设置 Flink 参数 execution.checkpointing.unaligned = true 就行,在 SQL client 中,可以使用 SET "key" = "value" 的语法设置 Flink 参数的值。 Unaligned checkpoint 较之 aligned checkpoint 主要的改变在于 * unaligned cp 在输入缓冲区收到第一个 cp barrier 的时候立即触发快照并直接输出至下游;代价是快照需要记录缓冲区中的数据来保证一致性,产生更多 io 并增大 cp 大小。 * aligned cp 在算子收到最后一个 cp barrier 完成 barrier 对齐后才触发快照,barrier 对齐期间较早收到 barrier 的 input channel 会被阻塞,在反压时阻塞时间会显著增加,导致 cp 速度变慢;好处是 barrier 对齐的过程使得快照不需要记录缓冲等待队列中的数据就可以保证一致性。 Best, Zhanghao Chen From: 小昌同学 Sent: Saturday, June 11, 2022 17:18 To: user-zh@flink.apache.org Subject: Unaligned Checkpoint 大佬们可以说说Unaligned Checkpoint的实现吗 看了不少文档 没有太看懂 我如果想在sql里面实现 这个该怎么设置啊 请大佬们指教 | | 小昌同学 | | ccc0606fight...@163.com |
Re: Re: Flink 使用interval join数据丢失疑问
非常感谢回复 1.针对watermark,我会再去进行测试。同时还会测试使用处理时间,interval join会不会丢失数据 2.针对interval jon,我个人的理解是它能关联到的数据范围要比inner join大,所以数据应该更准确,但是从结果上看却是数据丢失,当时非常震惊,有点颠覆我的认知了。同时我自己还有一个新的猜测,就是两个流的数据量不一样,可能也会造成数据丢失。目前左流是订单粒度数据,右流是订单-商品粒度数据,数据量要大很多。我个人理解,在处理右流的时候,应该会慢一点,所以导致两边的时间进展可能不一致。但是这又引发了一个新的疑问?inner join应该也会受这样的影响 3.还有一个问题可能是我没有阐述清楚,我在sql里使用inner join,没有注册水印,那么两个流的join应该是以处理时间来定义的?那么表的state的过期是否也是以处理时间来定义? lxk7...@163.com 发件人: Shengkai Fang 发送时间: 2022-06-11 20:35 收件人: user-zh 主题: Re: Re: Flink 使用interval join数据丢失疑问 hi, 对于第一点,丢数据的情况有很多。首先,要确认是不是 JOIN 算子丢数据(SINK 的使用不当也会丢数据)。如果明确了是 join 算子丢的数据,建议明确下丢的数据是咋样的,是不是 watermark 设置不合理,导致数据被误认为是晚到数据从而被丢了。例如,这里的是 `event time` = `rowtime` - 2s,是不是不合适,我咋记得一般都是 +2 s 呢? 对于第二点,interval join 我个人初步的理解是 state 的清理是根据两边的 event time,也就是说,如果右流的 event time 的更新会影响左流的数据清理。比如说右流的时间点到了 12:00,join 条件要求左流的时间不会晚于右流的时间 1h,那么左流 11:00之前的数据都可以被清理了。 对于第三点,我觉得是不能的。目前的 inner join + state 清理无法覆盖 event time 的window join 的。 best, Shengkai lxk7...@163.com 于2022年6月10日周五 23:03写道: > 对于这个问题,我还是有很大的疑问,再把我这个场景描述一下: > > 目前是使用flink进行双流join,两个流的数据,一个流是订单主表,另一个流是订单明细表。我们探查了离线的数据,订单明细表一般会在订单主表生成后晚几秒内生成,这个差异在秒级别。 > 我们做了以下几轮测试,并对比了另一个实时落的表数据量。(这个表就是基准参照数据,只是简单落表,没做任何处理,两边的数据源一致,对比的口径一致。) > 1.使用datastream api,使用kafka自带的时间戳做水印,使用interval join。对比完结果,数据少。 > 2.使用流转表,sql inner join,没有设置watermark。对比完结果数据正常。 > 3.使用流转表,sql interval join,从数据中的事件时间提取水印,对比完结果数据,数据少。 > 从结果上看,我不太明白为什么sql里inner join能保证数据准确,而interval > join不行?有什么好的方式或者思路能让我更好的去尝试了解这个问题产生的原因 > > 针对第二种方式,我的疑问是,sql里没有设置水印,那么表的state过期是以处理时间来计算吗?针对这种设置了表state过期时间的join,我能理解为这个inner > join其实是一个window join吗? > > > > lxk7...@163.com > > 发件人: lxk > 发送时间: 2022-06-10 18:18 > 收件人: user-zh > 主题: Re:Re:Re:Re:Re:Re:Re: Flink 使用interval join数据丢失疑问 > > > > 现在改成了sql interval join,代码和执行计划如下,其他配置没变,数据量还是少,使用inner join就没问题 > > > > > Table headerTable = > streamTableEnvironment.fromDataStream(headerFilterStream, > Schema.newBuilder() > .columnByExpression("rowtime", > "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") > .watermark("rowtime", "rowtime - INTERVAL '2' SECOND") > .build()); > Table itemTable = > streamTableEnvironment.fromDataStream(filterItemStream, Schema.newBuilder() > .columnByExpression("rowtime", > "CAST(substring(last_updated_at,0,19) AS TIMESTAMP_LTZ(3))") > .watermark("rowtime", "rowtime - INTERVAL '2' SECOND") > .build()); > > > > > streamTableEnvironment.createTemporaryView("header",headerTable); > streamTableEnvironment.createTemporaryView("item",itemTable); > > > > > > > Table result = streamTableEnvironment.sqlQuery("select > header.customer_id" + > ",item.goods_id" + > ",header.id" + > ",header.order_status" + > ",header.shop_id" + > ",header.parent_order_id" + > ",header.order_at" + > ",header.pay_at" + > ",header.channel_id" + > ",header.root_order_id" + > ",item.id" + > ",item.row_num" + > ",item.p_sp_sub_amt" + > ",item.display_qty" + > ",item.qty" + > ",item.bom_type" + > " from header JOIN item on header.id = item.order_id and > item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND > header.rowtime + INTERVAL '20' SECOND"); > > > > > String intervalJoin = streamTableEnvironment.explainSql("select > header.customer_id" + > ",item.goods_id" + > ",header.id" + > ",header.order_status" + > ",header.shop_id" + > ",header.parent_order_id" + > ",header.order_at" + > ",header.pay_at" + > ",header.channel_id" + > ",header.root_order_id" + > ",item.id" + > ",item.row_num" + > ",item.p_sp_sub_amt" + > ",item.display_qty" + > ",item.qty" + > ",item.bom_type" + > " from header JOIN item on header.id = item.order_id and > item.rowtime BETWEEN header.rowtime - INTERVAL '10' SECOND AND > header.rowtime + INTERVAL '20' SECOND"); > > > System.out.println(intervalJoin); > > > DataStream rowDataStream = > streamTableEnvironment.toChangelogStream(result); > > > > > > > 执行计划: > == Abstract Syntax Tree == > LogicalProject(customer_id=[$2], goods_id=[$16], id=[$0], > order_status=[$1], shop_id=[$3], parent_order_id=[$4], order_at=[$5], > pay_at=[$6], channel_id=[$7], root_order_id=[$8], id0=[$13], row_num=[$15], > p_sp_sub_amt=[$20], display_qty=[$23], qty=[$18], bom_type=[$21]) > +- LogicalJoin(condition=[AND(=($0, $14), >=($25, -($12, 1:INTERVAL > SECOND)), <=($25, +($12, 2:INTERVAL SECOND)))], joinType=[inner]) >:- LogicalWatermarkAssigner(rowtime=[rowtime],