hi:
我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key
但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题
下面我附录了报错信息和代码。谢谢!
报错附录
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 147, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in
get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.
: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires
that Table has a full primary keys if it is updated.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "mysql_join.py", line 90, in <module>
from_kafka_to_kafka_demo()
File "mysql_join.py", line 22, in from_kafka_to_kafka_demo
st_env.execute("2-from_kafka_to_kafka")
File
"/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that
Table has a full primary keys if it is updated.'
代码附录
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes,
EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble
def from_kafka_to_kafka_demo():
# use blink table planner
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
st_env =
StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
# register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
register_mysql_source(st_env)
st_env.sql_update("insert into flink_result select cast(t1.id as int) as
id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1
t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")
st_env.execute("2-from_kafka_to_kafka")
def register_rides_source(st_env):
source_ddl = \
"""
create table source1(
id int,
time1 varchar ,
type string
) with (
'connector.type' = 'kafka',
'connector.topic' = 'tp1',
'connector.startup-mode' = 'latest-offset',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'format.type' = 'json',
'connector.version' = 'universal',
'update-mode' = 'append'
)
"""
st_env.sql_update(source_ddl)
def register_mysql_source(st_env):
source_ddl = \
"""
CREATE TABLE dim_mysql (
id int, --
type varchar --
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3390/test',
'connector.table' = 'flink_test',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = '***',
'connector.password' = '***',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '1min',
'connector.lookup.max-retries' = '3'
)
"""
st_env.sql_update(source_ddl)
def register_rides_sink(st_env):
sink_ddl = \
"""
CREATE TABLE flink_result (
id int,
type varchar,
rtime bigint,
primary key(id) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3390/test',
'connector.table' = 'flink_result',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = '***',
'connector.password' = '***',
'connector.write.flush.max-rows' = '5000',
'connector.write.flush.interval' = '2s',
'connector.write.max-retries' = '3'
)
"""
st_env.sql_update(sink_ddl)
if __name__ == '__main__':
from_kafka_to_kafka_demo()
初学者
琴师