你好:
我现在在使用flink 1.11.2版本 hive1.1.0 版本;
当我在使用flink hive streaming的使用发现按照 示例写数据到hive 可以看到指定目录下已经生产了文件,但是使用hive查询没有数据;
好像是分区信息没有提交到hive meta store;但是官网已经说实现了这个功能;我操作却不行
下面是我的代码
object StreamMain {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
streamEnv.setParallelism(3)
val tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE)
tableEnv.getConfig.getConfiguration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(20))
val dataStream = streamEnv.addSource(new MySource)
val catalogName = "my_catalog"
val catalog = new HiveCatalog(
catalogName, // catalog name
"yutest", // default database
"D:\\IdeaProjects\\dc\\dc_hive\\src\\main\\resources\\", // Hive config
(hive-site.xml) directory
"1.1.0" // Hive version
)
tableEnv.registerCatalog(catalogName, catalog)
tableEnv.useCatalog(catalogName)
tableEnv.getConfig.setSqlDialect(SqlDialect.HIVE)
tableEnv.useDatabase("yutest")
tableEnv.createTemporaryView("users", dataStream)
tableEnv.executeSql("DROP TABLE IF EXISTS fs_table ")
// 如果hive中已经存在了相应的表,则这段代码省略
val hiveSql = """CREATE external TABLE fs_table (
user_id STRING,
order_amount DOUBLE
)
partitioned by(
dt string,
h string,
m string) stored as parquet
TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='metastore,success-file'
)""".stripMargin
tableEnv.executeSql(hiveSql)
val insertSql = "insert into fs_table SELECT userId, amount, " + "
DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')
FROM users"
tableEnv.executeSql(insertSql)
}
}
public class MySource implements SourceFunction<UserInfo> {
private volatile boolean run = true;
String userids[] = {
"4760858d-2bec-483c-a535-291de04b2247",
"67088699-d4f4-43f2-913c-481bff8a2dc5",
"72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb",
"dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
"aabbaa50-72f4-495c-b3a1-70383ee9d6a4",
"3218bbb9-5874-4d37-a82d-3e35e52d1702",
"3ebfb9602ac07779||3ebfe9612a007979",
"aec20d52-c2eb-4436-b121-c29ad4097f6c",
"e7e896cd939685d7||e7e8e6c1930689d7",
"a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
};
@Override
public void run(SourceFunction.SourceContext<UserInfo> sourceContext)
throws Exception {
while (run) {
String userid = userids[(int) (Math.random() * (userids.length -
1))];
UserInfo userInfo = new UserInfo();
userInfo.setUserId(userid);
userInfo.setAmount(Math.random() * 100);
userInfo.setTs(new Timestamp(System.currentTimeMillis()));
sourceContext.collect(userInfo);
Thread.sleep(100);
}
}
@Override
public void cancel() {
run = false;
}
}
public class UserInfo implements Serializable {
private String userId;
private Double amount;
private Timestamp ts;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public Timestamp getTs() {
return ts;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
}
hive (yutest)>
>
> show partitions fs_table;
OK
partition
Time taken: 20.214 seconds
[email protected]