Hi, 你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 Best, Jingsong On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[email protected]> wrote: > flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > h','sink.partition-commit.policy.kind'='success-file'); > 也报错误 > query: > streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > streamTableEnv.executeSql( > """ > | > | > |CREATE TABLE hive_table ( > | user_id STRING, > | age INT > |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > TBLPROPERTIES ( > | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > | 'sink.partition-commit.trigger'='partition-time', > | 'sink.partition-commit.delay'='1 h', > | 'sink.partition-commit.policy.kind'='metastore,success-file' > |) > | > |""".stripMargin) > > streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > streamTableEnv.executeSql( > """ > | > |CREATE TABLE kafka_table ( > | uid VARCHAR, > | -- uid BIGINT, > | sex VARCHAR, > | age INT, > | created_time TIMESTAMP(3), > | WATERMARK FOR created_time as created_time - INTERVAL '3' SECOND > |) WITH ( > | 'connector.type' = 'kafka', > | 'connector.version' = 'universal', > | 'connector.topic' = 'user', > | -- 'connector.topic' = 'user_long', > | 'connector.startup-mode' = 'latest-offset', > | 'connector.properties.zookeeper.connect' = > 'cdh1:2181,cdh2:2181,cdh3:2181', > | 'connector.properties.bootstrap.servers' = > 'cdh1:9092,cdh2:9092,cdh3:9092', > | 'connector.properties.group.id' = 'user_flink', > | 'format.type' = 'json', > | 'format.derive-schema' = 'true' > |) > |""".stripMargin) > > > > streamTableEnv.executeSql( > """ > | > |INSERT INTO hive_table > |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > DATE_FORMAT(created_time, 'HH') > |FROM kafka_table > | > |""".stripMargin) > > streamTableEnv.executeSql( > """ > | > |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > | > |""".stripMargin) > .print() > 错误栈: > Exception in thread "main" org.apache.flink.table.api.ValidationException: > Unable to create a sink for writing table > 'default_catalog.default_database.hive_table'. > > Table options are: > > 'hive.storage.file-format'='parquet' > 'is_generic'='false' > 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > 'sink.partition-commit.delay'='1 h' > 'sink.partition-commit.policy.kind'='metastore,success-file' > 'sink.partition-commit.trigger'='partition-time' > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > at > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > at > org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > at org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > Caused by: org.apache.flink.table.api.ValidationException: Table options > do not contain an option key 'connector' for discovering a connector. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > at > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > ... 19 more > > -- Best, Jingsong Lee
