你在hive的catalog中定义表的时候就可以定义好event time,以及watermark呀。
ZhaoShuKang <chuckzha...@163.com> 于2023年5月25日周四 08:53写道: > > 各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL > 设置事件时间有三种方式: > 1、在 DDL 中定义 > 2、在 DataStream 到 Table 转换时定义 > 3、使用 TableSource 定义 > 而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印? > 另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中 > toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream > API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。 > 以下是我的第一版代码 > // flink 集成 hive > System.out.println("初始化Flink环境"); > String hiveVersion = "3.1.2"; > String catalogName = "myhive"; > String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302"; > String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf"; > EnvironmentSettings settings = EnvironmentSettings > .newInstance() > .inStreamingMode() > .build(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.STREAMING); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, > settings); > > System.out.println("定义hive环境"); > // 定义 hive catalog 参数:catalog名称、数据库名称、对象名称 > HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, > hiveVersion); > tableEnv.registerCatalog(catalogName, hive); > > // 将 HiveCatalog 设置为 session 的当前 catalog > tableEnv.useCatalog(catalogName); > tableEnv.useDatabase(defaultDatabase); > // 设置 hive 并行度 > Configuration configuration = tableEnv.getConfig().getConfiguration(); > configuration.setInteger("table.exec.hive.infer-source-parallelism.max", > sourceParallelism); // Default 1000 > > // 使用 HiveTableSource > System.out.println("定义查询条件"); > // 定义查询条件 > Table table = tableEnv > .from(catalogName + "." + databaseName + "." + tableName) > .select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + > DAY + "," + HOUR) > .filter($(YEAR).isEqual(year)) > .filter($(MONTH).isEqual(startMonth)) > .filter($(DAY).isGreaterOrEqual(startDay)) > .filter($(HOUR).isGreaterOrEqual(startHour)) > .filter($(DAY).isLessOrEqual(endDay)) > .filter($(HOUR).isLessOrEqual(endHour)); > tableEnv.createTemporaryView("myTable", table); > > // Table 转 Stream,非常耗时 > System.out.println("Table to Stream"); > DataStream<Row> resultStream = tableEnv.toDataStream(table); > // 水印及窗口设置 > System.out.println("水印及窗口"); > resultStream > .assignTimestampsAndWatermarks(WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3)) > .withTimestampAssigner((SerializableTimestampAssigner<Row>) > (element, recordTimestamp) -> { > long datetime = 0; > try { > datetime = new SimpleDateFormat(DATEFORMAT) > > .parse(element.getFieldAs(DATETIME).toString()) > .getTime(); > } catch (ParseException e) { > e.printStackTrace(); > } > return datetime; > })) > .windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime))) > | | > ZhaoShuKang > | > | > chuckzha...@163.com > |