软件版本
flink:1.13.6
hive:1.1.1
hadoop:2.6.0-cdh5.16.2

通过createRemoteEnvironment方式将sql提交到远程flink集群操作的,hive连接时通过了Kerberos认证。代码如下,

运行后不报错,flink集群能看到job正在运行,kafka和hive表都创建成功了,kafka中一直在产生新的消息,而hive中却没有新数据进表中。

测试过将输出改为mysql或者csv文件都能持续产生新记录,hive表中的数据也能读取出来,或者从hive的一张表将数据插入刚刚创建的表中也是成功的。就是不知道为什么不能将kafka的动态数据动态写入新建的hive表中。

String KRB5_CONF_PATH = "/home/tetris/conf/krb5.ini";
String keytab = "/home/tetris/conf/company.keytab";
String principal = "company";
System.setProperty("java.security.krb5.conf", KRB5_CONF_PATH);

Configuration configuration = new Configuration();
configuration.set("hadoop.security.authentication", "kerberos");
configuration.set("keytab.file", keytab);
configuration.setBoolean("hadoop.security.authorization", true);
configuration.set("kerberos.principal", principal);
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(principal, keytab);

EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment("node1", 8081);
StreamTableEnvironment flinkTableEnv = 
StreamTableEnvironment.create(env,bsSettings);

HiveCatalog hiveCatalog = new HiveCatalog("myhive", "tetris", 
"/home/tetris/conf", "1.1.1");
flinkTableEnv.registerCatalog("myhive",hiveCatalog);
flinkTableEnv.useCatalog("myhive");
flinkTableEnv.executeSql("DROP TABLE IF EXISTS data_2431_4928").print();
flinkTableEnv.executeSql("CREATE TABLE data_2431_4928(id STRING,user_id 
STRING,status STRING) WITH (\n" +
    "'connector' = 'kafka',\n" +
    "'topic' = 'person',\n" +
    "'properties.bootstrap.servers' = '121.4.89.228:9092',\n" +
    "'properties.group.id' = 'testGroup',\n" +
    "'scan.startup.mode' = 'latest-offset',\n" +
    "'format' = 'json',\n" +
    "'json.fail-on-missing-field'='false','json.ignore-parse-errors'='true'\n" +
    ")").print();
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
flinkTableEnv.executeSql("DROP TABLE IF EXISTS output_2431_4930").print();
flinkTableEnv.executeSql("CREATE TABLE output_2431_4930(id STRING,user_id 
STRING,status STRING)").print();
flinkTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
TableResult result = flinkTableEnv.executeSql("INSERT INTO output_2431_4930 
SELECT id, user_id ,`status` FROM data_2431_4928");
System.out.println(result.getJobClient().get().getJobID());



谌祥,杭州 - java后端开发 - 大数据方向
799590...@qq.com

回复