好的,谢谢~~~

JasonLee <[email protected]> 于2020年7月16日周四 下午8:22写道:

> hi
> 需要开启checkpoint
>
>
> | |
> JasonLee
> |
> |
> 邮箱:[email protected]
> |
>
> Signature is customized by Netease Mail Master
>
> 在2020年07月16日 18:03,李佳宸 写道:
> 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
> 批量的hive写入,流环境的读取是正常的。
>
> 附代码,很简短:
>
> public class KafkaToHiveStreaming {
>    public static void main(String[] arg) throws Exception{
>        StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>        EnvironmentSettings bsSettings =
>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>        StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>        String name            = "myhive";
>        String defaultDatabase = "default";
>        String hiveConfDir     =
> "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
> path
>        String version         = "3.1.2";
>
>        HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
> hiveConfDir, version);
>        bsTableEnv.registerCatalog("myhive", hive);
>        bsTableEnv.useCatalog("myhive");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>        bsTableEnv.executeSql("CREATE TABLE topic_products (" +
>                "  id BIGINT ," +
>                "  order_id STRING," +
>                "  amount DECIMAL(10, 2)," +
>                "  create_time TIMESTAMP " +
>                ") WITH (" +
>                " 'connector' = 'kafka'," +
>                " 'topic' = 'order.test'," +
>                " 'properties.bootstrap.servers' = 'localhost:9092'," +
>                " 'properties.group.id' = 'testGroup'," +
>                " 'scan.startup.mode' = 'earliest-offset', " +
>                " 'format' = 'json'  " +
>                ")");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>
>        bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
>                "  id BIGINT ," +
>                "  order_id STRING," +
>                "  amount DECIMAL(10, 2)" +
>                "  )");
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
>        bsTableEnv.executeSql("CREATE TABLE print_table WITH
> ('connector' = 'print')" +
>                "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING
> ALL)");
>
>        bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
>        bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT
> " +
>                "id, " +
>                "order_id, " +
>                "amount " +
>                "FROM topic_products");
>
>        Table table1 = bsTableEnv.from("hive_sink_table_streaming");
>        table1.executeInsert("print_table");
>    }
> }
>

回复