Hi, I think the problem is that you are using BatchTableEnvironment which is deprecated and does not support newer features such as e.g. FLIP-95 sources/sinks. I am sorry it is not more prominent in the documentation.
I am not too familiar with the python API, and I am not sure if a
unified TableEnvironment is available there. In Java/Scala I'd recommend
using the unified TableEnvironment. If it is not available in python
API, you can use the StreamTableEnvironment, which actually extends the
unified one.
Moreover, please, please make sure you are using the same component
versions or otherwise you might face hard to track problems. You are
mixing components for different scala versions. (Your cluster uses scala
2.12, but you are adding scala 2.11 additional dependencies).
Best,
Dawid
On 14/10/2020 03:13, xi sizhe wrote:
>
> I am using flink latest (1.11.2) to work with a sample mysql database,
> which the database is working fine.
>
> Additionally, i have added the flink-connector-jdbc_2.11-1.11.2,
> mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib
>
> Here is my code
>
> |T_CONFIG = TableConfig() B_EXEC_ENV =
> ExecutionEnvironment.get_execution_environment()
> B_EXEC_ENV.set_parallelism(1) BT_ENV =
> BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG) ddl = """ CREATE
> TABLE nba_player4 ( first_name STRING , last_name STRING, email
> STRING, id INT ) WITH ( 'connector' = 'jdbc', 'url' =
> 'jdbc:mysql://localhost:3306/inventory', 'username' = 'root',
> 'password' = 'debezium', 'table-name' = 'customers' ) """;
> BT_ENV.sql_update(ddl) sinkddl = """ CREATE TABLE print_table ( f0
> INT, f1 INT, f2 STRING, f3 DOUBLE ) WITH ( 'connector' = 'print' )
> """; BT_ENV.sql_update(sinkddl) sqlquery("SELECT first_name, last_name
> FROM nba_player4 "); BT_ENV.execute("table_job") |
>
> However when running the code, it come up with error saying
>
> |py4j.protocol.Py4JJavaError: An error occurred while calling
> o23.sqlQuery. : org.apache.flink.table.api.ValidationException: SQL
> validation failed. findAndCreateTableSource failed. Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
> find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in the
> classpath. Reason: Required context properties mismatch. The following
> properties are requested: connector=jdbc password=debezium
> schema.0.data-type=VARCHAR(2147483647) schema.0.name
> <http://schema.0.name>=first_name
> schema.1.data-type=VARCHAR(2147483647) schema.1.name
> <http://schema.1.name>=last_name
> schema.2.data-type=VARCHAR(2147483647) schema.2.name
> <http://schema.2.name>=email schema.3.data-type=INT schema.3.name
> <http://schema.3.name>=id table-name=customers
> url=jdbc:mysql://localhost:3306/inventory username=root The following
> factories have been considered:
> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.filesystem.FileSystemTableFactory |
>
> latest:
>
> this is my docker yml file.
>
> |version: '2.1' services: jobmanager: build: . image: flink:latest
> hostname: "jobmanager" expose: - "6123" ports: - "8081:8081" command:
> jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
> taskmanager: image: flink:latest expose: - "6121" - "6122" depends_on:
> - jobmanager command: taskmanager links: - jobmanager:jobmanager
> environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager mysql: image:
> debezium/example-mysql ports: - "3306:3306" environment: -
> MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser -
> MYSQL_PASSWORD=mysqlpw |
>
> docker ps commands show out
>
> |CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES cf84c84f7821
> flink "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 6121-6123/tcp,
> 8081/tcp _taskmanager_1 09b19142d70a flink "/docker-entrypoint.…" 9
> minutes ago Up 9 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp
> _jobmanager_1 4ac01eb11bf7 debezium/example-mysql
> "docker-entrypoint.s…" 3 days ago Up 9 minutes 0.0.0.0:3306->3306/tcp,
> 33060/tcp keras-flask-dep |
>
> more info:
>
> *my current flink environment* in docker is flink:scala_2.12-java8
>
> |docker pull flink:scala_2.12-java8 |
>
> *pyflink jdbc connector* is flink-connector-jdbc_2.11-1.11.2.jar from
> flink 1.11 version.
>
> |https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> |
>
> in order to use the jdbc library, I tried two ways
>
> 1.
>
> save the flink-connector-jdbc_2.11-1.11.2.jar into
> /usr/local/lib/python3.7/site-packages/pyflink/lib
>
> 2.
>
> configure the classpath in the python app
>
> |base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
> flink_jdbc_jar =
> f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
> BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)
> |
>
> but still getting the same error
>
signature.asc
Description: OpenPGP digital signature
