各位老师好,我最近在做Flink查询Hive的功能,需要用到窗口处理数据,在编写代码过程中无法设置水印,我看官网看到Table API & SQL 
设置事件时间有三种方式:
1、在 DDL 中定义
2、在 DataStream 到 Table 转换时定义
3、使用 TableSource 定义
而我使用的是HiveCatalog查询hive,貌似用不上以上三种方式。所以我想问问各位老师,有没有一种办法可以直接在Table上设置某个字段为事件事件,并且设置水印?
另外说明,我的第一版代码是将Table转换为DataSteam,然后再设置水印和窗口,但是执行转换过程非常耗时,并且在源码中 
toDataSteam()方法的注释上也说“表生态系统的类型系统比DataStream API的类型系统更丰富”,因此开始考虑使用Table或SQL解决问题。
以下是我的第一版代码
// flink 集成 hive
System.out.println("初始化Flink环境");
String hiveVersion = "3.1.2";
String catalogName = "myhive";
String defaultDatabase = "dwd_1580_egd_finishing_mill_lv1_202302";
String hiveConfDir = "/usr/hdp/3.1.4.0-315/apache-hive-3.1.2-bin/conf";
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
        .inStreamingMode()
        .build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);

System.out.println("定义hive环境");
// 定义 hive catalog 参数:catalog名称、数据库名称、对象名称
HiveCatalog hive = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir, 
hiveVersion);
tableEnv.registerCatalog(catalogName, hive);

// 将 HiveCatalog 设置为 session 的当前 catalog
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
// 设置 hive 并行度
Configuration configuration = tableEnv.getConfig().getConfiguration();
configuration.setInteger("table.exec.hive.infer-source-parallelism.max", 
sourceParallelism); // Default 1000

// 使用 HiveTableSource
System.out.println("定义查询条件");
// 定义查询条件
Table table = tableEnv
.from(catalogName + "." + databaseName + "." + tableName)
        .select(DATETIME + "," + fields + "," + YEAR + "," + MONTH + "," + DAY 
+ "," + HOUR)
        .filter($(YEAR).isEqual(year))
        .filter($(MONTH).isEqual(startMonth))
        .filter($(DAY).isGreaterOrEqual(startDay))
        .filter($(HOUR).isGreaterOrEqual(startHour))
        .filter($(DAY).isLessOrEqual(endDay))
        .filter($(HOUR).isLessOrEqual(endHour));
tableEnv.createTemporaryView("myTable", table);

// Table 转 Stream,非常耗时
System.out.println("Table to Stream");
DataStream<Row> resultStream = tableEnv.toDataStream(table);
// 水印及窗口设置
System.out.println("水印及窗口");
resultStream
.assignTimestampsAndWatermarks(WatermarkStrategy.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                .withTimestampAssigner((SerializableTimestampAssigner<Row>) 
(element, recordTimestamp) -> {
long datetime = 0;
try {
                        datetime = new SimpleDateFormat(DATEFORMAT)
                                .parse(element.getFieldAs(DATETIME).toString())
                                .getTime();
                    } catch (ParseException e) {
                        e.printStackTrace();
                    }
return datetime;
                }))
        .windowAll(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
| |
ZhaoShuKang
|
|
chuckzha...@163.com
|

回复