我们在开发一个Flink SQL 框架,在从kafka读取数据加工写入到Hive时一直不成功,sql脚本如下:
CREATE TABLE hive_table_from_kafka (
   collect_time STRING,
   content1 STRING, 
   content2 STRING
) PARTITIONED BY (
  dt STRING,hr STRING
) TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='0S',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

然后代码中对于创建表的sql做如下的处理
private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) {
  String ddl = cmdCall.operands[0];
  if (ddl.contains("hive_table")) {
    tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  } else {
    tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
  }
  try {
    tableEnv.executeSql(ddl);
  } catch (SqlParserException e) {
    throw new RuntimeException("SQL execute failed:\n" + ddl + "\n", e);
  }
}在执行上面的SQL语句时,总是报没有设置connector:Caused by: 
org.apache.flink.table.api.ValidationException: Table options do not contain an 
option key 'connector' for discovering a connector



yinghua...@163.com

回复