我把sink.partition-commit.trigger 设置成process-time 可以看到数据;
但是我后来设置source 产生出watermark 还是不行;
val dataStream = streamEnv.addSource(new MySource)
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps[UserInfo]()
.withTimestampAssigner(new SerializableTimestampAssigner[UserInfo] {
override def extractTimestamp(element: UserInfo, recordTimestamp:
Long): Long = element.getTs.getTime
}))
生成的userinfo类型的ts是时间戳,所以watermark 使用的是他提取的
[email protected]
发件人: Jingsong Li
发送时间: 2020-10-28 16:29
收件人: user-zh
主题: Re: flink hive Streaming查询不到数据的问题
Hi,
你的Source看起来并没有产出watermark,所以:
你可以考虑使得Source产出正确的watermark,或者使用'sink.partition-commit.trigger'的默认值proc-time。
Best,
Jingsong
On Wed, Oct 28, 2020 at 4:13 PM [email protected] <
[email protected]> wrote:
> 你好:
> 我现在在使用flink 1.11.2版本 hive1.1.0 版本;
> 当我在使用flink hive streaming的使用发现按照 示例写数据到hive
> 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
> 好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行
> 下面是我的代码
> object StreamMain {
> def main(args: Array[String]): Unit = {
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> streamEnv.setParallelism(3)
>
> val tableEnvSettings = EnvironmentSettings.newInstance()
> .useBlinkPlanner()
> .inStreamingMode()
> .build()
>
> val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
>
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
> CheckpointingMode.EXACTLY_ONCE)
>
>
> tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
> Duration.ofSeconds(20))
>
> val dataStream = streamEnv.addSource(new MySource)
>
> val catalogName = "my_catalog"
> val catalog = new HiveCatalog(
> catalogName, // catalog name
> "yutest", // default database
>
> "D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive
> config (hive-site.xml) directory
> "1.1.0" // Hive version
> )
> tableEnv.registerCatalog(catalogName, catalog)
> tableEnv.useCatalog(catalogName)
>
> tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
> tableEnv.useDatabase("yutest")
>
>
> tableEnv.createTemporaryView("users", dataStream)
> tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ")
> // 如果hive中已经存在了相应的表,则这段代码省略
> val hiveSql = """CREATE external TABLE fs_table (
> user_id STRING,
> order_amount DOUBLE
> )
> partitioned by(
> dt string,
> h string,
> m string) stored as parquet
> TBLPROPERTIES (
>
> 'partition.time-extractor.timestamp-pattern'='$dt
> $h:$m:00',
> 'sink.partition-commit.delay'='0s',
> 'sink.partition-commit.trigger'='partition-time',
>
>
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> )""".stripMargin
> tableEnv.executeSql(hiveSql)
>
>
> val insertSql = "insert into fs_table SELECT userId, amount, " + "
> DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
> FROM users"
> tableEnv.executeSql(insertSql)
> }
> }
> public class MySource implements SourceFunction<UserInfo> {
> private volatile boolean run = true;
> String userids[] = {
>
> "4760858d-2bec-483c-a535-291de04b2247",
> "67088699-d4f4-43f2-913c-481bff8a2dc5",
>
> "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
> "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
>
> "aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
> "3218bbb9-5874-4d37-a82d-3e35e52d1702",
>
> "3ebfb9602ac07779||3ebfe9612a007979",
> "aec20d52-c2eb-4436-b121-c29ad4097f6c",
>
> "e7e896cd939685d7||e7e8e6c1930689d7",
> "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
> };
>
> @Override
>
> public void run(SourceFunction.SourceContext<UserInfo> sourceContext)
> throws Exception {
>
> while (run) {
>
> String userid = userids[(int) (Math.random() * (userids.length -
> 1))];
> UserInfo userInfo = new UserInfo();
> userInfo.setUserId(userid);
> userInfo.setAmount(Math.random() * 100);
> userInfo.setTs(new Timestamp(System.currentTimeMillis()));
> sourceContext.collect(userInfo);
> Thread.sleep(100);
> }
> }
>
> @Override
> public void cancel() {
> run = false;
> }
> }
> public class UserInfo implements Serializable {
> private String userId;
> private Double amount;
> private Timestamp ts;
>
> public String getUserId() {
> return userId;
> }
>
> public void setUserId(String userId) {
> this.userId = userId;
> }
>
> public Double getAmount() {
> return amount;
> }
>
> public void setAmount(Double amount) {
> this.amount = amount;
> }
>
> public Timestamp getTs() {
> return ts;
> }
>
> public void setTs(Timestamp ts) {
> this.ts = ts;
> }
> }
>
> hive (yutest)>
> >
> > show partitions fs_table;
> OK
> partition
> Time taken: 20.214 seconds
>
> ------------------------------
> [email protected]
>
--
Best, Jingsong Lee