?????? flink-1.11 hive-1.2.1 ddl ????????????

2020-07-29 文章 kcz
sorry,idea??log4j??process-time ?? 
process time??log??




----
??: 
   "user-zh"

http://connector.properties.group.id/;' 
= 'domain_testGroup',\n" +

 "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +

 "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +

 "\t'update-mode' = 'append',\n" +

 "\t'format.type' = 'json',\n" +

 "\t'format.derive-schema' = 'true'\n" +

 ")");


Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options
 


Re: flink-1.11 hive-1.2.1 ddl 无法写入数据

2020-07-29 文章 Leonard Xu
Hi, kcz

看connector的properties还是1.10的格式,你换成1.11试试[1].


> 在 2020年7月29日,15:07,kcz <573693...@qq.com> 写道:
> 
>  tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
>"\thost STRING,\n" +
>"\turl STRING,\n" +
>"\tpublic_date STRING\n" +
>") WITH (\n" +
>"\t'connector.type' = 'kafka',\n" +
>"\t'connector.version' = 'universal',\n" +
>"\t'connector.startup-mode' = 'latest-offset',\n" +
>"\t'connector.topic' = 'sendMessage',\n" +
>"\t'connector.properties.group.id 
> ' = 'domain_testGroup',\n" +
>"\t'connector.properties.zookeeper.connect' = 
> '127.0.0.1:2181',\n" +
>"\t'connector.properties.bootstrap.servers' = 
> '127.0.0.1:9092',\n" +
>"\t'update-mode' = 'append',\n" +
>"\t'format.type' = 'json',\n" +
>"\t'format.derive-schema' = 'true'\n" +
>")");


Best
Leonard
[1]https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/kafka.html#connector-options
 


flink-1.11 hive-1.2.1 ddl ????????????

2020-07-29 文章 kcz
hive

package com.hive;

import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.time.Duration;

public class HiveTest {
private static final String path = "hdfs_path";
public static void main(String []args)  {
System.setProperty("HADOOP_USER_NAME", "work");
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setStateBackend(new FsStateBackend(path));
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env,tableEnvSettings);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);

tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(20));

String name= "myhive";
String defaultDatabase = "situation";
String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
String version = "1.2.1";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS situation");
tableEnv.executeSql("DROP TABLE IF EXISTS situation.source_table");


tableEnv.executeSql("CREATE TABLE situation.source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'sendMessage',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = 
'127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = 
'127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");

tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tableEnv.executeSql("DROP TABLE IF EXISTS situation.fs_table");

String hiveSql = "\n" +
"  CREATE TABLE situation.fs_table (\n" +
" \n" +
"host STRING,\n" +
"url STRING,\n" +
"public_date STRING\n" +
"  \n" +
"  ) PARTITIONED BY (\n" +
"ts_date STRING,\n" +
"ts_hour STRING,\n" +
"ts_minute STRING\n" +
"  ) STORED AS PARQUET\n" +
"  TBLPROPERTIES (\n" +
"'sink.partition-commit.trigger' = 'process time',\n" +
"'sink.partition-commit.delay' = '1 min',\n" +
"'sink.partition-commit.policy.kind' = 
'metastore,success-file',\n" +
"'partition.time-extractor.timestamp-pattern' = '$ts_date 
$ts_hour:$ts_minute:00'\n" +
"  )\n" +
"  ";
tableEnv.executeSql(hiveSql);

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

tableEnv.executeSql("INSERT INTO  situation.fs_table SELECT host, 
url,public_date," +
" DATE_FORMAT(public_date,'-MM-dd') 
,DATE_FORMAT(public_date,'HH') ,DATE_FORMAT(public_date,'mm')  FROM 
situation.source_table");



}
}

flink-1.11 ????hive-1.2.1 DDL????

2020-07-17 文章 kcz
idea 
??hivepom
hive-exec flink-connector-hive_2.11
:
 StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);
// 
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.setStateBackend(new FsStateBackend(path));

StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

String name= "myhive";
String defaultDatabase = "situation";
String hiveConfDir = "/load/data/hive/hive-conf"; // a local path
String version = "1.2.1";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, 
version);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS stream_tmp");
tableEnv.executeSql("DROP TABLE IF EXISTS stream_tmp.source_table");


??

Exception in thread "main" java.lang.IncompatibleClassChangeError: Implementing 
class
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at 
org.apache.flink.table.planner.delegation.PlannerBase.