建表如下:

CREATE TABLE test (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
 'connector'='kafka',
 'topic'='test',
 'properties.group.id'='c_mysql_binlog_es',
 'properties.bootstrap.servers'='localhost:9092',
 'scan.startup.mode'='latest-offset',
 'format'='canal-json',
 'canal-json.ignore-parse-errors'='true'
);


# 输出表至es
CREATE TABLE test_mirror_es (
`id` INT,
`name` VARCHAR(255),
`time` TIMESTAMP(3),
`status` INT,
PRIMARY KEY(id) NOT ENFORCED 
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://localhost:9200',
  'index' = 'test_mirror'
);

INSERT into test_mirror_es SELECT * from test where test.id >=0;

日志:Caused by: org.apache.flink.table.api.ValidationException: Unable to
create a source for reading table
'default_catalog.default_database.test_mirror_es'.

完整日志如下:


2020-08-12 13:07:20,815 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.rpc.address, localhost
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.rpc.port, 6123
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.memory.process.size, 1600m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.memory.process.size, 1728m
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: taskmanager.numberOfTaskSlots, 10
2020-08-12 13:07:20,820 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: parallelism.default, 5
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: state.savepoints.dir,
hdfs://localhost:9000/flink-1.11.0/flink-savepoints
2020-08-12 13:07:20,821 INFO 
org.apache.flink.configuration.GlobalConfiguration           [] - Loading
configuration property: jobmanager.execution.failover-strategy, region
2020-08-12 13:07:21,198 INFO 
org.apache.flink.table.client.config.entries.ExecutionEntry  [] - Property
'execution.restart-strategy.type' not specified. Using default value:
fallback
2020-08-12 13:07:22,099 INFO 
org.apache.flink.table.client.gateway.local.ExecutionContext [] - Executor
config: {taskmanager.memory.process.size=1728m,
jobmanager.execution.failover-strategy=region,
jobmanager.rpc.address=localhost, execution.target=remote,
jobmanager.memory.process.size=1600m,
state.savepoints.dir=hdfs://localhost:9000/flink-1.11.0/flink-savepoints,
jobmanager.rpc.port=6123, execution.savepoint.ignore-unclaimed-state=false,
execution.attached=true, execution.shutdown-on-attached-exit=false,
pipeline.jars=[file:/data1/home/xxx/flink-demo/flink-1.11.0/opt/flink-sql-client_2.11-1.11.0.jar],
parallelism.default=5, taskmanager.numberOfTaskSlots=10,
pipeline.classpaths=[]}
2020-08-12 13:07:22,286 INFO  org.apache.flink.table.client.cli.CliClient       
          
[] - Command history file path: /root/.flink-sql-history
2020-08-12 13:07:46,637 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@41a16eb3 for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:07:46,709 INFO  org.apache.flink.configuration.Configuration      
          
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:17,512 INFO 
org.apache.flink.table.client.gateway.local.ProgramDeployer  [] - Submitting
job org.apache.flink.streaming.api.graph.StreamGraph@3ff8a3ad for query
default: INSERT into test_mirror_es SELECT * from test where id >0`
2020-08-12 13:10:17,516 INFO  org.apache.flink.configuration.Configuration      
          
[] - Config uses fallback configuration key 'jobmanager.rpc.address' instead
of key 'rest.address'
2020-08-12 13:10:38,360 WARN  org.apache.flink.table.client.cli.CliClient       
          
[] - Could not execute SQL statement.
org.apache.flink.table.client.gateway.SqlExecutionException: Invalidate SQL
statement.
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:99)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:90)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:257)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:211)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
[flink-sql-client_2.11-1.11.0.jar:1.11.0]
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a source for reading table
'default_catalog.default_database.test_mirror_es'.

Table options are:

'connector'='elasticsearch-7'
'hosts'='http://localhost:9200'
'index'='test_mirror'
        at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
Source) ~[?:?]
        at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        ... 6 more
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a
connector using option ''connector'='elasticsearch-7''.
        at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
Source) ~[?:?]
        at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        ... 6 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find
any factory for identifier 'elasticsearch-7' that implements
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the
classpath.

Available factory identifiers are:

datagen
jdbc
kafka
        at
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
~[flink-table_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
~[flink-table-blink_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.lambda$parse$0(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1$$Lambda$460/590884726.get(Unknown
Source) ~[?:?]
        at
org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:255)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.gateway.local.LocalExecutor$1.parse(LocalExecutor.java:430)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        at
org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:97)
~[flink-sql-client_2.11-1.11.0.jar:1.11.0]
        ... 6 more




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复