Hi??
??????????????https://mp.weixin.qq.com/s/GyFTjQl6ch8jc733mpCP7Q
????????pyflink??????????????????????jdbc??jar??????????????????jdbc??????????flink??????1.13.1
????????????????
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(
env,environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
table_env.get_config().get_configuration().set_string("pipeline.classpaths",
"file:///home/flink/lib/flink-connector-jdbc_2.12-1.13.1.jar;file:///home/flink/lib/mysql-connector-java-8.0.25.jar")
# 2. create source Table
table_env.execute_sql("""
CREATE TABLE table_source (
e string
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://********:3306/test',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = 'enum_test',
'username' = 'pms_etl',
'password' = 'pms_etl_q'
)
""")
# 3. create sink Table
table_env.execute_sql("""
CREATE TABLE print (
e string
) WITH (
'connector' = 'print'
)
""")
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM
table_source").wait()
????????python????????????????????
Traceback (most recent call last):
File "demo.py", line 41, in <module>
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM
table_source").wait()
File
"/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py",
line 804, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line
1286, in __call__
answer, self.gateway_client, self.target_id, self.name)
File
"/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 146,
in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line
328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for
reading table 'default_catalog.default_database.table_source'.
Table options are:
'connector'='jdbc'
'driver'='com.mysql.cj.jdbc.Driver'
'password'='pms_etl_q'
'table-name'='enum_test'
'url'='jdbc:mysql://*******:3306/test'
'username'='pms_etl'
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:137)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(CatalogSourceTable.java:116)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:82)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:595)
at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:268)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:724)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option: 'connector'='jdbc'
at
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:467)
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:441)
at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:133)
... 31 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any
factory for identifier 'jdbc' that implements
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
filesystem
print
at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:319)
at
org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:463)
... 33 more
????flink run -py demo.py ??????????????
File "./demo.py", line 41, in <module>
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM
table_source").wait()
File
"/home/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py", line
804, in execute_sql
File
"/home/flink/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286,
in __call__
File "/home/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py",
line 158, in deco
pyflink.util.exceptions.TableException:
org.apache.flink.table.api.TableException: Sink
`default_catalog`.`default_database`.`table_sink` does not exists
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:233)
at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:162)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at
scala.collection.Iterator.foreach(Iterator.scala:937)
at
scala.collection.Iterator.foreach$(Iterator.scala:937)
at
scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at
scala.collection.IterableLike.foreach(IterableLike.scala:70)
at
scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableLike.map(TraversableLike.scala:233)
at
scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at
scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:856)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at
org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
org.apache.flink.client.program.ProgramAbortException:
java.lang.RuntimeException: Python process exits with code: 1
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)
at
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)
at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 13 more
??????????????????