开了checkpoint,
val streamExecutionEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamExecutionEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamExecutionEnv.enableCheckpointing(5 * 1000, CheckpointingMode.EXACTLY_ONCE)
streamExecutionEnv.getCheckpointConfig.setCheckpointTimeout(10 * 1000)




间隔5s,超时10s,不过,等了2分多钟,hdfs上写入了10几个文件了,查hive还是没数据














在 2020-07-13 16:52:16,"Jingsong Li" <jingsongl...@gmail.com> 写道:
>有开checkpoint吧?delay设的多少?
>
>Add partition 在 checkpoint完成 + delay的时间后
>
>Best,
>Jingsong
>
>On Mon, Jul 13, 2020 at 4:50 PM Zhou Zach <wander...@163.com> wrote:
>
>> Hi,
>> 根据你的提示,加上HiveCatalog,已经成功写入数据到hdfs了,不过,为什么,直接通过hue查hive表,没数据,必须手动add
>> partition到hive表吗,我当前设置了参数
>> 'sink.partition-commit.policy.kind'='metastore'
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> At 2020-07-13 15:01:28, "Jingsong Li" <jingsongl...@gmail.com> wrote:
>> >Hi,
>> >
>> >你用了HiveCatalog了吗?Hive表或Hive方言必须要结合HiveCatalog
>> >
>> >不然就只能用Filesystem connector,如果你使用filesystem也报错,那就贴下报错信息
>> >
>> >Best,
>> >Jingsong
>> >
>> >On Mon, Jul 13, 2020 at 2:58 PM Zhou Zach <wander...@163.com> wrote:
>> >
>> >> flink 1.11 sink hive table的connector设置为什么啊,尝试设置
>> >>
>> WITH('connector'='filesystem','path'='...','format'='parquet','sink.partition-commit.delay'='1
>> >> h','sink.partition-commit.policy.kind'='success-file');
>> >> 也报错误
>> >> query:
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |
>> >>     |CREATE TABLE hive_table (
>> >>     |  user_id STRING,
>> >>     |  age INT
>> >>     |) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
>> >> TBLPROPERTIES (
>> >>     |  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
>> >>     |  'sink.partition-commit.trigger'='partition-time',
>> >>     |  'sink.partition-commit.delay'='1 h',
>> >>     |  'sink.partition-commit.policy.kind'='metastore,success-file'
>> >>     |)
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.getConfig.setSqlDialect(SqlDialect.DEFAULT)
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |CREATE TABLE kafka_table (
>> >>     |    uid VARCHAR,
>> >>     |    -- uid BIGINT,
>> >>     |    sex VARCHAR,
>> >>     |    age INT,
>> >>     |    created_time TIMESTAMP(3),
>> >>     |    WATERMARK FOR created_time as created_time - INTERVAL '3'
>> SECOND
>> >>     |) WITH (
>> >>     |    'connector.type' = 'kafka',
>> >>     |    'connector.version' = 'universal',
>> >>     |     'connector.topic' = 'user',
>> >>     |    -- 'connector.topic' = 'user_long',
>> >>     |    'connector.startup-mode' = 'latest-offset',
>> >>     |    'connector.properties.zookeeper.connect' =
>> >> 'cdh1:2181,cdh2:2181,cdh3:2181',
>> >>     |    'connector.properties.bootstrap.servers' =
>> >> 'cdh1:9092,cdh2:9092,cdh3:9092',
>> >>     |    'connector.properties.group.id' = 'user_flink',
>> >>     |    'format.type' = 'json',
>> >>     |    'format.derive-schema' = 'true'
>> >>     |)
>> >>     |""".stripMargin)
>> >>
>> >>
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |INSERT INTO hive_table
>> >>     |SELECT uid, age, DATE_FORMAT(created_time, 'yyyy-MM-dd'),
>> >> DATE_FORMAT(created_time, 'HH')
>> >>     |FROM kafka_table
>> >>     |
>> >>     |""".stripMargin)
>> >>
>> >> streamTableEnv.executeSql(
>> >> """
>> >>     |
>> >>     |SELECT * FROM hive_table WHERE dt='2020-07-13' and hr='13'
>> >>     |
>> >>     |""".stripMargin)
>> >> .print()
>> >> 错误栈:
>> >> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException:
>> >> Unable to create a sink for writing table
>> >> 'default_catalog.default_database.hive_table'.
>> >>
>> >> Table options are:
>> >>
>> >> 'hive.storage.file-format'='parquet'
>> >> 'is_generic'='false'
>> >> 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00'
>> >> 'sink.partition-commit.delay'='1 h'
>> >> 'sink.partition-commit.policy.kind'='metastore,success-file'
>> >> 'sink.partition-commit.trigger'='partition-time'
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at
>> >>
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> >>         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> >>         at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> >>         at
>> >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> >>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>         at
>> >> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> >>         at
>> scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >>         at
>> >>
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:694)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781)
>> >>         at
>> >>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
>> >>         at
>> >> org.rabbit.sql.FromKafkaSinkHive$.main(FromKafkaSinkHive.scala:65)
>> >>         at
>> org.rabbit.sql.FromKafkaSinkHive.main(FromKafkaSinkHive.scala)
>> >> Caused by: org.apache.flink.table.api.ValidationException: Table options
>> >> do not contain an option key 'connector' for discovering a connector.
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
>> >>         at
>> >>
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>> >>         ... 19 more
>> >>
>> >>
>> >
>> >--
>> >Best, Jingsong Lee
>>
>
>
>-- 
>Best, Jingsong Lee

回复