你把完整的程序再贴下呢 Best, Jingsong
On Mon, Jul 13, 2020 at 5:46 PM Zhou Zach <[email protected]> wrote: > Hi, > > > 我现在改成了: > 'sink.partition-commit.delay'='0s' > > > checkpoint完成了20多次,hdfs文件也产生了20多个, > hive表还是查不到数据 > > > > > > > > > > > > > > 在 2020-07-13 17:23:34,"夏帅" <[email protected]> 写道: > > 你好, > 你设置了1个小时的 > SINK_PARTITION_COMMIT_DELAY > > > ------------------------------------------------------------------ > 发件人:Zhou Zach <[email protected]> > 发送时间:2020年7月13日(星期一) 17:09 > 收件人:user-zh <[email protected]> > 主 题:Re:Re: Re: Table options do not contain an option key 'connector' for > discovering a connector. > > > 开了checkpoint, > val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment > > streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > streamExecutionEnv.enableCheckpointing(5 * 1000, > CheckpointingMode.EXACTLY_ONCE) > streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000) > > > > > 间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据 > > > > > > > > > > > > > > > 在 2020-07-13 16:52:16,"Jingsong Li" <[email protected]> 写道: > >有开checkpoint吧?delay设的多少? > > > >Add partition 在 checkpoint完成 + delay的时间后 > > > >Best, > >Jingsong > > > >On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <[email protected]> wrote: > > > >> Hi, > >> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add > >> partition到hive表吗,我当前设置了参数 > >> 'sink.partition-commit.policy.kind'='metastore' > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> At 2020-07-13 15:01:28, "Jingsong Li" <[email protected]> wrote: > >> >Hi, > >> > > >> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog > >> > > >> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息 > >> > > >> >Best, > >> >Jingsong > >> > > >> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <[email protected]> wrote: > >> > > >> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置 > >> >> > >> > WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1 > >> >> h','sink.partition-commit.policy.kind'='success-file'); > >> >> 也报错误 > >> >> query: > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> | > >> >> |CREATE TABLE hive_table ( > >> >> | user_id STRING, > >> >> | age INT > >> >> |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet > >> >> TBLPROPERTIES ( > >> >> | 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', > >> >> | 'sink.partition-commit.trigger'='partition-time', > >> >> | 'sink.partition-commit.delay'='1 h', > >> >> | 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> |) > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT) > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |CREATE TABLE kafka_table ( > >> >> | uid VARCHAR, > >> >> | -- uid BIGINT, > >> >> | sex VARCHAR, > >> >> | age INT, > >> >> | created_time TIMESTAMP(3), > >> >> | WATERMARK FOR created_time as created_time - INTERVAL '3' > >> SECOND > >> >> |) WITH ( > >> >> | 'connector.type' = 'kafka', > >> >> | 'connector.version' = 'universal', > >> >> | 'connector.topic' = 'user', > >> >> | -- 'connector.topic' = 'user_long', > >> >> | 'connector.startup-mode' = 'latest-offset', > >> >> | 'connector.properties.zookeeper.connect' = > >> >> 'cdh1:2181,cdh2:2181,cdh3:2181', > >> >> | 'connector.properties.bootstrap.servers' = > >> >> 'cdh1:9092,cdh2:9092,cdh3:9092', > >> >> | 'connector.properties.group.id' = 'user_flink', > >> >> | 'format.type' = 'json', > >> >> | 'format.derive-schema' = 'true' > >> >> |) > >> >> |""".stripMargin) > >> >> > >> >> > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |INSERT INTO hive_table > >> >> |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'), > >> >> DATE_FORMAT(created_time, 'HH') > >> >> |FROM kafka_table > >> >> | > >> >> |""".stripMargin) > >> >> > >> >> streamTableEnv.executeSql( > >> >> """ > >> >> | > >> >> |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13' > >> >> | > >> >> |""".stripMargin) > >> >> .print() > >> >> 错误栈: > >> >> Exception in thread "main" > >> org.apache.flink.table.api.ValidationException: > >> >> Unable to create a sink for writing table > >> >> 'default_catalog.default_database.hive_table'. > >> >> > >> >> Table options are: > >> >> > >> >> 'hive.storage.file-format'='parquet' > >> >> 'is_generic'='false' > >> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00' > >> >> 'sink.partition-commit.delay'='1 h' > >> >> 'sink.partition-commit.policy.kind'='metastore,success-file' > >> >> 'sink.partition-commit.trigger'='partition-time' > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163) > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> at > >> >> > >> > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > >> >> at > scala.collection.Iterator$class.foreach(Iterator.scala:891) > >> >> at > >> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > >> >> at > >> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > >> >> at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > >> >> at > >> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > >> >> at > >> scala.collection.AbstractTraversable.map(Traversable.scala:104) > >> >> at > >> >> > >> > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) > >> >> at > >> >> > >> > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > >> >> at > >> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65) > >> >> at > >> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala) > >> >> Caused by: org.apache.flink.table.api.ValidationException: Table > options > >> >> do not contain an option key 'connector' for discovering a connector. > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321) > >> >> at > >> >> > >> > org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157) > >> >> ... 19 more > >> >> > >> >> > >> > > >> >-- > >> >Best, Jingsong Lee > >> > > > > > >-- > >Best, Jingsong Lee > -- Best, Jingsong Lee
