I am using flink latest (1.11.2) to work with a sample mysql database, which the database is working fine.
Additionally, i have added the flink-connector-jdbc_2.11-1.11.2, mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib Here is my code T_CONFIG = TableConfig() B_EXEC_ENV = ExecutionEnvironment.get_execution_environment() B_EXEC_ENV.set_parallelism(1) BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG) ddl = """ CREATE TABLE nba_player4 ( first_name STRING , last_name STRING, email STRING, id INT ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/inventory', 'username' = 'root', 'password' = 'debezium', 'table-name' = 'customers' ) """; BT_ENV.sql_update(ddl) sinkddl = """ CREATE TABLE print_table ( f0 INT, f1 INT, f2 STRING, f3 DOUBLE ) WITH ( 'connector' = 'print' ) """; BT_ENV.sql_update(sinkddl) sqlquery("SELECT first_name, last_name FROM nba_player4 "); BT_ENV.execute("table_job") However when running the code, it come up with error saying py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery. : org.apache.flink.table.api.ValidationException: SQL validation failed. findAndCreateTableSource failed. Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector=jdbc password=debezium schema.0.data-type=VARCHAR(2147483647)schema.0.name=first_name schema.1.data-type=VARCHAR(2147483647)schema.1.name=last_name schema.2.data-type=VARCHAR(2147483647)schema.2.name=email schema.3.data-type=INTschema.3.name=id table-name=customers url=jdbc:mysql://localhost:3306/inventory username=root The following factories have been considered: org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.filesystem.FileSystemTableFactory latest: this is my docker yml file. version: '2.1' services: jobmanager: build: . image: flink:latest hostname: "jobmanager" expose: - "6123" ports: - "8081:8081" command: jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager taskmanager: image: flink:latest expose: - "6121" - "6122" depends_on: - jobmanager command: taskmanager links: - jobmanager:jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager mysql: image: debezium/example-mysql ports: - "3306:3306" environment: - MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser - MYSQL_PASSWORD=mysqlpw docker ps commands show out CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES cf84c84f7821 flink "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 6121-6123/tcp, 8081/tcp _taskmanager_1 09b19142d70a flink "/docker-entrypoint.…" 9 minutes ago Up 9 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp _jobmanager_1 4ac01eb11bf7 debezium/example-mysql "docker-entrypoint.s…" 3 days ago Up 9 minutes 0.0.0.0:3306->3306/tcp, 33060/tcp keras-flask-dep more info: *my current flink environment* in docker is flink:scala_2.12-java8 docker pull flink:scala_2.12-java8 *pyflink jdbc connector* is flink-connector-jdbc_2.11-1.11.2.jar from flink 1.11 version. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html in order to use the jdbc library, I tried two ways 1. save the flink-connector-jdbc_2.11-1.11.2.jar into /usr/local/lib/python3.7/site-packages/pyflink/lib 2. configure the classpath in the python app base_dir = "/Users/huhu/Documents/projects/webapp/libs/" flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar" BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars) but still getting the same error