HI??
????????????????????????????????????????????
------------------ ???????? ------------------
??????:
"????"
<[email protected]>;
????????: 2020??7??23??(??????) ????9:35
??????: "user-zh"<[email protected]>;
????: "user-zh"<[email protected]>;
????: ?????? flinksql1.11????????????????
??????????????????????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??7??23??(??????) ????9:30
??????: "????"<[email protected]>;
????: "user-zh"<[email protected]>;
????: Re: flinksql1.11????????????????
Hi,
??????query????????????????join???? FOR SYSTEM_TIME AS OF ??????????????regular
join??mysql????bounded??????????????????????????????????????????
????join??????????????????????look up ????????????????????????????temporal
table(??????)????????????[1] ???? temporal table join
????
Leonard Xu
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#joins>
> ?? 2020??7??23????09:06?????? <[email protected]> ??????
>
>
> HI??
>
????????????????????????????????????????????????????????????????????????????????????????????????????
>>> ????
>>> id type
>>> 2 err
>>> 1 err
>>> ????
>>>
> 1 err 20200723085754
> 2 err 20200723085755
> 3 err 20200723085756
> 4 err 20200723085757
>
>
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????2s??
>>> ????
>>> id type
>>> 2 acc
>>> 1 acc
>>> ????
>
> 94 err 20200723084455
> 95 err 20200723084456
> 96 err 20200723084457
> 97 err 20200723084458
> 98 err 20200723084459
> 99 err 20200723084500
> 100 err 20200723084501
>
>
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
>
> ????????????????????????????????????????????????????
>
>
> ??????
>
>
>>>
>>>
>
>
> ------------------ ???????? ------------------
> ??????: "Leonard Xu" <[email protected]>;
> ????????: 2020??7??22??(??????) ????9:39
> ??????: "????"<[email protected]>;
> ????: Re: flinksql1.11????????????????
>
> <[email protected]>
>
> ????????????????????????????????????????????????????????????
>
> ????
>
>> ?? 2020??7??22????16:39??Leonard Xu <[email protected]
<mailto:[email protected]>> ??????
>>
>> HI,
>> ??????????????????????????????????????????????????????????????????
>>
>>
>>> ?? 2020??7??22????16:27?????? <[email protected]
<mailto:[email protected]>> ??????
>>>
>>>
>>> HI
>>>
????????????????????????????????????????????????????????????????????????????????HELP????
>>> ????
>>>
>>> ------------------ ???????? ------------------
>>> ??????: "????" <[email protected]
<mailto:[email protected]>>;
>>> ????????: 2020??7??22??(??????) ????3:17
>>> ??????: "user-zh"<[email protected]
<mailto:[email protected]>>;
>>> ????: ????: Re: flinksql1.11????????????????
>>>
>>> ??????
>>>
????????????????????????????1.11.0??????????TIDB??????????demo????????????????????
>>>
>>> ??????????????????????????????????kafka
>>> topic = 'tp1'
>>> for i in range(1,10000) :
>>>
stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
>>> msg = {}
>>> msg['id']= i
>>> msg['time1']= stime
>>> msg['type']=1
>>> print(msg)
>>> send_msg(topic, msg)
>>> time.sleep(1)
>>>
>>> {'id': 1, 'time1': '20200722140624', 'type': 1}
>>> {'id': 2, 'time1': '20200722140625', 'type': 1}
>>> {'id': 3, 'time1': '20200722140626', 'type': 1}
>>> {'id': 4, 'time1': '20200722140627', 'type': 1}
>>> {'id': 5, 'time1': '20200722140628', 'type': 1}
>>> {'id': 6, 'time1': '20200722140629', 'type': 1}
>>> {'id': 7, 'time1': '20200722140631', 'type': 1}
>>> {'id': 8, 'time1': '20200722140632', 'type': 1}
>>>
>>> ????????????
>>> id type
>>> 2 err
>>> 1 err
>>>
>>>
??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment,
TimeCharacteristic
>>> from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
>>> from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
>>> from pyflink.table.window import Tumble
>>>
>>>
>>> def from_kafka_to_kafka_demo():
>>>
>>> # use blink table planner
>>> env =
StreamExecutionEnvironment.get_execution_environment()
>>>
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>>> env_settings =
EnvironmentSettings.Builder().use_blink_planner().build()
>>> st_env =
StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
>>>
>>> # register source and sink
>>> register_rides_source(st_env)
>>> register_rides_sink(st_env)
>>> register_mysql_source(st_env)
>>>
>>>
>>> st_env.sql_update("insert into flink_result
select cast(t1.id <http://t1.id/> as int) as id,cast(t2.type as
varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join
dim_mysql t2 on t1.type=cast(t2.id <http://t2.id/> as varchar) ")
>>> st_env.execute("2-from_kafka_to_kafka")
>>>
>>>
>>>
>>> def register_rides_source(st_env):
>>> source_ddl = \
>>> """
>>> create table source1(
>>> id int,
>>> time1 varchar ,
>>> type string
>>> ) with (
>>> 'connector' = 'kafka',
>>> 'topic' = 'tp1',
>>> 'scan.startup.mode' = 'latest-offset',
>>> 'properties.bootstrap.servers' =
'localhost:9092',
>>> 'format' = 'json'
>>> )
>>> """
>>> st_env.sql_update(source_ddl)
>>>
>>> def register_mysql_source(st_env):
>>> source_ddl = \
>>> """
>>> CREATE TABLE dim_mysql (
>>> id int, --
>>> type varchar --
>>> ) WITH (
>>> 'connector' = 'jdbc',
>>> 'url' = 'jdbc:mysql://localhost:3390/test'
<>,
>>> 'table-name' = 'flink_test',
>>> 'driver' = 'com.mysql.cj.jdbc.Driver',
>>> 'username' = '***',
>>> 'password' = '***',
>>> 'lookup.cache.max-rows' = '5000',
>>> 'lookup.cache.ttl' = '1s',
>>> 'lookup.max-retries' = '3'
>>> )
>>> """
>>> st_env.sql_update(source_ddl)
>>>
>>> def register_rides_sink(st_env):
>>> sink_ddl = \
>>> """
>>> CREATE TABLE flink_result (
>>> id int,
>>> type varchar,
>>> rtime bigint,
>>> primary key(id) NOT ENFORCED
>>> ) WITH (
>>> 'connector' = 'jdbc',
>>> 'url' = 'jdbc:mysql://localhost:3390/test'
<>,
>>> 'table-name' = 'flink_result',
>>> 'driver' = 'com.mysql.cj.jdbc.Driver',
>>> 'username' = '***',
>>> 'password' = '***',
>>> 'sink.buffer-flush.max-rows' = '5000',
>>> 'sink.buffer-flush.interval' = '2s',
>>> 'sink.max-retries' = '3'
>>> )
>>> """
>>> st_env.sql_update(sink_ddl)
>>>
>>>
>>> if __name__ == '__main__':
>>> from_kafka_to_kafka_demo()
>>>
>>> ??????
>>> PyFlink??????
>>> ????
>>>
>>>
>>> ???????? Leonard Xu <mailto:[email protected]>
>>> ?????????? 2020-07-22 15:05
>>> ???????? user-zh <mailto:[email protected]>
>>> ?????? Re: flinksql1.11????????????????
>>> Hi,
>>>
>>> ????????????????????????????????????????????????
>>>
>>> ????
>>> > ?? 2020??7??22????14:50???????????????? <[email protected]
<mailto:[email protected]>> ??????
>>> >
>>> > ??????
>>> >
>>> >
>>> >
????????????????????????????????????????????????????????????????????????????????????????????????????????????????????????
>>> > ??????????????????????????????????????????????????
>>> >
>>> >
>>> > ????
>>> >
>>> >
>>> >
------------------&nbsp;????????&nbsp;------------------
>>> >
??????:
"user-zh"
<[email protected] <mailto:[email protected]>&gt;;
>>> > ????????:&nbsp;2020??7??22??(??????) ????2:42
>>> > ??????:&nbsp;"user-zh"<[email protected]
<mailto:[email protected]>&gt;;
>>> >
>>> > ????:&nbsp;Re: flinksql1.11????????????????
>>> >
>>> >
>>> >
>>> > Hello
>>> >
??????????????????????????????????????????????????????????????????????????????????????????????????????????join??????????????????????????retract????????????????????????????????????????????????
>>> > ??look
up??????????????????????????????retract??????????????????
>>> >
>>> > ????
>>> > Leonard Xu
>>> >
>>> >
>>> > &gt; ?? 2020??7??22????14:[email protected]
<http://qq.com/> ??????
>>> > &gt;
>>> > &gt; ??????????????????????
>>
>