@Benchao Li <libenc...@gmail.com>  感谢回复,这个问题困扰我半年了,导致我一直不能迁移到
FLink,可能我的Case 太特殊了.

我 input topic 和 schema 如果下,但是要注意的是,这个 topic 里包含了两个 MySQL DB 的 Binlog,我需要
filter 出来 main_db__tansaction_tab, merchant_db__transaction_tab, 两个 DB
中的两个表。所以这里的字段我定义的是 两张表的字段的并集.

还要注意的是 even time 是 create_time, 这里问题非常大:
 1. 很多表都有 create time,所以会导致很多不用的表也能解析出来 watermark, 导致混乱
 2. Binlog 是 change log, 所以历史数据会不断更新,会导致有很多旧的 create time进来,可能会影响 watermark
forward on.

    bsTableEnv.executeSql("""
      CREATE TABLE input_database (
        `table` STRING,
        `database` STRING,
        `data` ROW(
          reference_id STRING,
          transaction_sn STRING,
          transaction_type BIGINT,
          merchant_id BIGINT,
          transaction_id BIGINT,
          status BIGINT
         ),
        ts BIGINT,
        event_time AS TO_TIMESTAMP(FROM_UNIXTIME(create_time)),
        WATERMARK FOR event_time AS event_time - INTERVAL '10' HOUR
     ) WITH (
       'connector.type' = 'kafka',
       'connector.version' = '0.11',
       'connector.topic' = 'mytopic',
       'connector.properties.bootstrap.servers' = 'xxxx',
       'format.type' = 'json'
     )
    """)


分别 filter 出来 两张表,进行 interval Join,这个是一直没有输出的,我两张表输出试过,没有任何问题。

    val main_db = bsTableEnv.sqlQuery("""
      | SELECT *
      | FROM input_database
      | WHERE `database` = 'main_db'
      |  AND `table` LIKE 'transaction_tab%'
      | """.stripMargin)

    val merchant_db = bsTableEnv.sqlQuery("""
      | SELECT *
      | FROM input_database
      | WHERE `database` = 'merchant_db'
      |   AND `table` LIKE 'transaction_tab%'
      | """.stripMargin)

    bsTableEnv.createTemporaryView("main_db", main_db)
    bsTableEnv.createTemporaryView("merchant_db", merchant_db)

    val result = bsTableEnv.sqlQuery("""
       SELECT *
       FROM (
          SELECT t1.`table`, t1.`database`, t1.transaction_type,
t1.transaction_id,
            t1.reference_id, t1.transaction_sn, t1.merchant_id,
t1.status, t1.event_time
          FROM main_db as t1
          LEFT JOIN merchant_db as t2
          ON t1.reference_id = t2.reference_id
          WHERE t1.event_time >= t2.event_time + INTERVAL '1' HOUR
           AND t1.event_time <= t2.event_time - INTERVAL '1' HOUR
       )
      """.stripMargin)



事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
-----
你提到的这个问题,我估计我的 watermark 前进肯定是不正常的。但是我无法理解为什么 interval join 需要 watermark
来驱动。
我的理解是,他会把两边的数据都保留在 state 里,既然是 Left join,如果左边有数据查右边的state,如果可以 join上,就输出
join 的结果,如果没有 join上,那应该正常输出左边的数据,这才是 Left join 应有的逻辑把.






Benchao Li <libenc...@apache.org> 于2020年12月8日周二 下午3:23写道:

> hi macia,
>
> 事件时间的interval join是需要用watermark来驱动的。你可以确认你的watermark是正常前进的么?
>
> macia kk <pre...@gmail.com> 于2020年12月8日周二 上午1:15写道:
>
> > 抱歉,是 >-30 and <+30
> >
> > 贴的只是demo,我的疑问是,既然是 Left Join,所以无所有没有Jion上右边,左边肯定会输出的,不至于一天条没有
> >
> > 赵一旦 <hinobl...@gmail.com>于2020年12月7日 周一23:28写道:
> >
> > > 准确点,2个条件之间没and?2个都是>?
> > >
> > > macia kk <pre...@gmail.com> 于2020年12月7日周一 下午10:30写道:
> > >
> > > > 不好意思,我上边贴错了
> > > >
> > > > SELECT *
> > > >  FROM A
> > > >  LEFT OUT JOIN B
> > > >  ON order_id
> > > >  Where A.event_time > B.event_time -  30 s
> > > >      A.event_time > B.event_time + 30 s
> > > >
> > > > event_time 是 Time Attributes 设置的 event_time
> > > >
> > > > 这样是没有输出的。
> > > >
> > > >
> > > >
> > > > interval join 左右表在 state 中是缓存多久的?
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > hailongwang <18868816...@163.com> 于2020年12月7日周一 下午8:05写道:
> > > >
> > > > > Hi,
> > > > > 其中 条件是
> > > > > `Where A.event_time < B.event_time + 30 s and A.event_time >
> > > B.event_time
> > > > > - 30 s ` 吧
> > > > > 可以参考以下例子[1],看下有木有写错。
> > > > > [1]
> > > > >
> > > >
> > >
> >
> https://github.com/apache/flink/blob/59ae84069313ede60cf7ad3a9d2fe1bc07c4e460/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/IntervalJoinITCase.scala#L183
> > > > >
> > > > >
> > > > > Best,
> > > > > Hailong
> > > > > 在 2020-12-07 13:10:02,"macia kk" <pre...@gmail.com> 写道:
> > > > > >Hi, 各位大佬
> > > > > >
> > > > > >  我的上游是一个 Kafka Topic, 里边把一个 MySQL DB 所有的 Binlog 打进去了。我的
> > > > > >Flink任务的在处理的时候,消费一次,然后 filter out 出来 表A 和 表B,表A是 order事件 ,表B 是
> order
> > > > item
> > > > > >信息,所以 我用:
> > > > > >
> > > > > > SELECT *
> > > > > > FROM A
> > > > > > LEFT OUT JOIN B
> > > > > > ON order_id
> > > > > > Where A.event_time > B.event_time + 30 s
> > > > > >     A.event_time > B.event_time - 30 s
> > > > > >
> > > > > >我测了下,A 和 BI 单独都可以消费输出,但是如果加上 Left Join 之后就没有输出数据了,可以确认的是我用 Spark
> > > > > Structural
> > > > > >Streaming 实现同样的逻辑是有输出的。 因为我的理解既然是 Left Join,
> > > > > >所以无论如何,左边是一定会输出的,不知道Flink Interval Join 在具体实现的逻辑是什么,我在处理上哪里有问题?
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Best,
> Benchao Li
>

回复