hi
需要开启checkpoint

| |
JasonLee
|
|
邮箱:[email protected]
|

Signature is customized by Netease Mail Master

在2020年07月16日 18:03,李佳宸 写道:
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
   public static void main(String[] arg) throws Exception{
       StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
       EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
       StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
       String name            = "myhive";
       String defaultDatabase = "default";
       String hiveConfDir     =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
       String version         = "3.1.2";

       HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
       bsTableEnv.registerCatalog("myhive", hive);
       bsTableEnv.useCatalog("myhive");
       bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
       bsTableEnv.executeSql("CREATE TABLE topic_products (" +
               "  id BIGINT ," +
               "  order_id STRING," +
               "  amount DECIMAL(10, 2)," +
               "  create_time TIMESTAMP " +
               ") WITH (" +
               " 'connector' = 'kafka'," +
               " 'topic' = 'order.test'," +
               " 'properties.bootstrap.servers' = 'localhost:9092'," +
               " 'properties.group.id' = 'testGroup'," +
               " 'scan.startup.mode' = 'earliest-offset', " +
               " 'format' = 'json'  " +
               ")");
       bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

       bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
               "  id BIGINT ," +
               "  order_id STRING," +
               "  amount DECIMAL(10, 2)" +
               "  )");
       bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
       bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
               "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

       bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
       bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
               "id, " +
               "order_id, " +
               "amount " +
               "FROM topic_products");

       Table table1 = bsTableEnv.from("hive_sink_table_streaming");
       table1.executeInsert("print_table");
   }
}

回复