flink 1.11 sink hive table的connector设置为什么啊,尝试设置
WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
 h','sink.partition-commit.policy.kind'='success-file');
也报错误
query:
streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
streamTableEnv.executeSql(
"""
    |
    |
    |CREATE TABLE hive_table (
    |  user_id STRING,
    |  age INT
    |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
    |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
    |  'sink.partition-commit.trigger'='partition-time',
    |  'sink.partition-commit.delay'='1 h',
    |  'sink.partition-commit.policy.kind'='metastore,success-file'
    |)
    |
    |""".stripMargin)

streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
streamTableEnv.executeSql(
"""
    |
    |CREATE TABLE kafka_table (
    |    uid VARCHAR,
    |    -- uid BIGINT,
    |    sex VARCHAR,
    |    age INT,
    |    created_time TIMESTAMP(3),
    |    WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND
    |) WITH (
    |    'connector.type' = 'kafka',
    |    'connector.version' = 'universal',
    |     'connector.topic' = 'user',
    |    -- 'connector.topic' = 'user_long',
    |    'connector.startup-mode' = 'latest-offset',
    |    'connector.properties.zookeeper.connect' = 
'cdh1:2181,cdh2:2181,cdh3:2181',
    |    'connector.properties.bootstrap.servers' = 
'cdh1:9092,cdh2:9092,cdh3:9092',
    |    'connector.properties.group.id' = 'user_flink',
    |    'format.type' = 'json',
    |    'format.derive-schema' = 'true'
    |)
    |""".stripMargin)



streamTableEnv.executeSql(
"""
    |
    |INSERT INTO hive_table
    |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), 
DATE_FORMAT(created_time, 'HH')
    |FROM kafka_table
    |
    |""".stripMargin)

streamTableEnv.executeSql(
"""
    |
    |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
    |
    |""".stripMargin)
.print()
错误栈:
Exception in thread "main" org.apache.flink.table.api.ValidationException: 
Unable to create a sink for writing table 
'default_catalog.default_database.hive_table'.

Table options are:

'hive.storage.file-format'='parquet'
'is_generic'='false'
'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
'sink.partition-commit.delay'='1 h'
'sink.partition-commit.policy.kind'='metastore,success-file'
'sink.partition-commit.trigger'='partition-time'
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
        at org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
        at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
Caused by: org.apache.flink.table.api.ValidationException: Table options do not 
contain an option key 'connector' for discovering a connector.
        at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
        at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
        ... 19 more

回复