好的,谢谢~~~ JasonLee <[email protected]> 于2020年7月16日周四 下午8:22写道:
> hi > 需要开启checkpoint > > > | | > JasonLee > | > | > 邮箱:[email protected] > | > > Signature is customized by Netease Mail Master > > 在2020年07月16日 18:03,李佳宸 写道: > 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 > 批量的hive写入,流环境的读取是正常的。 > > 附代码,很简短: > > public class KafkaToHiveStreaming { > public static void main(String[] arg) throws Exception{ > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > String name = "myhive"; > String defaultDatabase = "default"; > String hiveConfDir = > "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local > path > String version = "3.1.2"; > > HiveCatalog hive = new HiveCatalog(name, defaultDatabase, > hiveConfDir, version); > bsTableEnv.registerCatalog("myhive", hive); > bsTableEnv.useCatalog("myhive"); > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > bsTableEnv.executeSql("CREATE TABLE topic_products (" + > " id BIGINT ," + > " order_id STRING," + > " amount DECIMAL(10, 2)," + > " create_time TIMESTAMP " + > ") WITH (" + > " 'connector' = 'kafka'," + > " 'topic' = 'order.test'," + > " 'properties.bootstrap.servers' = 'localhost:9092'," + > " 'properties.group.id' = 'testGroup'," + > " 'scan.startup.mode' = 'earliest-offset', " + > " 'format' = 'json' " + > ")"); > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > > bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" + > " id BIGINT ," + > " order_id STRING," + > " amount DECIMAL(10, 2)" + > " )"); > bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); > bsTableEnv.executeSql("CREATE TABLE print_table WITH > ('connector' = 'print')" + > "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING > ALL)"); > > bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); > bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT > " + > "id, " + > "order_id, " + > "amount " + > "FROM topic_products"); > > Table table1 = bsTableEnv.from("hive_sink_table_streaming"); > table1.executeInsert("print_table"); > } > } >
