你好:
下面是我的代码,我用的版本是1.11.0,数据库是TIDB,我跑的是demo数据,维表只有两行。
我的输入流如下,每秒新增一条写入到kafka
topic = 'tp1'
for i in range(1,10000) :
stime=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
msg = {}
msg['id']= i
msg['time1']= stime
msg['type']=1
print(msg)
send_msg(topic, msg)
time.sleep(1)
{'id': 1, 'time1': '20200722140624', 'type': 1}
{'id': 2, 'time1': '20200722140625', 'type': 1}
{'id': 3, 'time1': '20200722140626', 'type': 1}
{'id': 4, 'time1': '20200722140627', 'type': 1}
{'id': 5, 'time1': '20200722140628', 'type': 1}
{'id': 6, 'time1': '20200722140629', 'type': 1}
{'id': 7, 'time1': '20200722140631', 'type': 1}
{'id': 8, 'time1': '20200722140632', 'type': 1}
维表数据如下
id type
2 err
1 err
我在程序正常期间更新了维表,但是后续输出的结果显示维表还是之前的缓存数据,事实上已经远远大于超时时间了,甚至我停下输入流,直到达到超时时间后再次输入,新的结果还是输出旧的维表数据
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble
def from_kafka_to_kafka_demo():
# use blink table planner
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
st_env =
StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
# register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
register_mysql_source(st_env)
st_env.sql_update("insert into flink_result select cast(t1.id as int) as
id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1
t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")
st_env.execute("2-from_kafka_to_kafka")
def register_rides_source(st_env):
source_ddl = \
"""
create table source1(
id int,
time1 varchar ,
type string
) with (
'connector' = 'kafka',
'topic' = 'tp1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'json'
)
"""
st_env.sql_update(source_ddl)
def register_mysql_source(st_env):
source_ddl = \
"""
CREATE TABLE dim_mysql (
id int, --
type varchar --
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3390/test',
'table-name' = 'flink_test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '***',
'password' = '***',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '1s',
'lookup.max-retries' = '3'
)
"""
st_env.sql_update(source_ddl)
def register_rides_sink(st_env):
sink_ddl = \
"""
CREATE TABLE flink_result (
id int,
type varchar,
rtime bigint,
primary key(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3390/test',
'table-name' = 'flink_result',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = '***',
'password' = '***',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
)
"""
st_env.sql_update(sink_ddl)
if __name__ == '__main__':
from_kafka_to_kafka_demo()
初学者
PyFlink爱好者
琴师
发件人: Leonard Xu
发送时间: 2020-07-22 15:05
收件人: user-zh
主题: Re: flinksql1.11中主键声明的问题
Hi,
我试了下应该是会更新缓存的,你有能复现的例子吗?
祝好
> 在 2020年7月22日,14:50,奇怪的不朽琴师 <[email protected]> 写道:
>
> 你好:
>
>
> 可能是我描述的不清楚,我了解这个机制,我的意思维表更新后,即便已经达到了超时的时间,新的输出结果还是用维表历史缓存数据,
> 我感觉上是维表没有刷新缓存,但是我不知道这为什么。
>
>
> 谢谢
>
>
> ------------------ 原始邮件 ------------------
> 发件人:
> "user-zh"
>
> <[email protected]>;
> 发送时间: 2020年7月22日(星期三) 下午2:42
> 收件人: "user-zh"<[email protected]>;
>
> 主题: Re: flinksql1.11中主键声明的问题
>
>
>
> Hello
> 你说的输出结果更新,是指之前关联的维表时老数据,过了一段时间,这个数据变,之前输出的历史也希望更新吗?维表join的实现,只有事实表中才会有retract消息才会更新,才会传递到下游,维表的数据是事实表
> 去look up该表时的数据,维表的更新是不会retract之前的历史记录的。
>
> 祝好
> Leonard Xu
>
>
> > 在 2020年7月22日,14:13,[email protected] 写道:
> >
> > 输出结果仍然没有被更新