我们在开发一个Flink SQL 框架,在从kafka读取数据加工写入到Hive时一直不成功,sql脚本如下: CREATE TABLE hive_table_from_kafka ( collect_time STRING, content1 STRING, content2 STRING ) PARTITIONED BY ( dt STRING,hr STRING ) TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file' );
然后代码中对于创建表的sql做如下的处理 private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) { String ddl = cmdCall.operands[0]; if (ddl.contains("hive_table")) { tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); } else { tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); } try { tableEnv.executeSql(ddl); } catch (SqlParserException e) { throw new RuntimeException("SQL execute failed:\n" + ddl + "\n", e); } }在执行上面的SQL语句时,总是报没有设置connector:Caused by: org.apache.flink.table.api.ValidationException: Table options do not contain an option key 'connector' for discovering a connector yinghua...@163.com