想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。
附代码,很简短:
public class KafkaToHiveStreaming {
public static void main(String[] arg) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
String version = "3.1.2";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
bsTableEnv.registerCatalog("myhive", hive);
bsTableEnv.useCatalog("myhive");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE topic_products (" +
" id BIGINT ," +
" order_id STRING," +
" amount DECIMAL(10, 2)," +
" create_time TIMESTAMP " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order.test'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'testGroup'," +
" 'scan.startup.mode' = 'earliest-offset', " +
" 'format' = 'json' " +
")");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
" id BIGINT ," +
" order_id STRING," +
" amount DECIMAL(10, 2)" +
" )");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
"LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");
bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
"id, " +
"order_id, " +
"amount " +
"FROM topic_products");
Table table1 = bsTableEnv.from("hive_sink_table_streaming");
table1.executeInsert("print_table");
}
}