建表如下: CREATE TABLE test ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector'='kafka', 'topic'='test', 'properties.group.id'='c_mysql_binlog_es', 'properties.bootstrap.servers'='localhost:9092', 'scan.startup.mode'='latest-offset', 'format'='canal-json', 'canal-json.ignore-parse-errors'='true' );
# 输出表至es CREATE TABLE test_mirror_es ( `id` INT, `name` VARCHAR(255), `time` TIMESTAMP(3), `status` INT, PRIMARY KEY(id) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://localhost:9200', 'index' = 'test_mirror' ); INSERT into test_mirror_es SELECT * from test where test.id >=0; 日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.test_mirror_es'. 完整日志如下: 2020-08-12 13:07:20,815 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.address, localhost 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.rpc.port, 6123 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.memory.process.size, 1600m 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.numberOfTaskSlots, 10 2020-08-12 13:07:20,820 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 5 2020-08-12 13:07:20,821 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: state.savepoints.dir, hdfs://localhost:9000/flink-1.11.0/flink-savepoints 2020-08-12 13:07:20,821 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.execution.failover-strategy, region 2020-08-12 13:07:21,198 INFO org.apache.flink.table.client.config.entries.ExecutionEntry [] - Property 'execution.restart-strategy.type' not specified. Using default value: fallback 2020-08-12 13:07:22,099 INFO org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor config: {taskmanager.memory.process.size=1728m, jobmanager.execution.failover-strategy=region, jobmanager.rpc.address=localhost, execution.target=remote, jobmanager.memory.process.size=1600m, state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints, jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false, execution.attached=true, execution.shutdown-on-attached-exit=false, pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar], parallelism.default=5, taskmanager.numberOfTaskSlots=10, pipeline.classpaths=[]} 2020-08-12 13:07:22,286 INFO org.apache.flink.table.client.cli.CliClient [] - Command history file path: /root/.flink-sql-history 2020-08-12 13:07:46,637 INFO org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query default: INSERT into test_mirror_es SELECT * from test where id >0` 2020-08-12 13:07:46,709 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' 2020-08-12 13:10:17,512 INFO org.apache.flink.table.client.gateway.local.ProgramDeployer [] - Submitting job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query default: INSERT into test_mirror_es SELECT * from test where id >0` 2020-08-12 13:10:17,516 INFO org.apache.flink.configuration.Configuration [] - Config uses fallback configuration key 'jobmanager.rpc.address' instead of key 'rest.address' 2020-08-12 13:10:38,360 WARN org.apache.flink.table.client.cli.CliClient [] - Could not execute SQL statement. org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL statement. at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) [flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) [flink-sql-client_2.11-1.11.0.jar:1.11.0] Caused by: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.test_mirror_es'. Table options are: 'connector'='elasticsearch-7' 'hosts'='http://localhost:9200' 'index'='test_mirror' at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown Source) ~[?:?] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] ... 6 more Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option ''connector'='elasticsearch-7''. at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown Source) ~[?:?] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] ... 6 more Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'elasticsearch-7' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen jdbc kafka at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) ~[flink-table_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) ~[flink-table-blink_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown Source) ~[?:?] at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] at org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97) ~[flink-sql-client_2.11-1.11.0.jar:1.11.0] ... 6 more -- Sent from: http://apache-flink.147419.n8.nabble.com/