StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.getConfig().addConfiguration(
new Configuration()
.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(30)));
tEnv.executeSql("CREATE TEMPORARY FUNCTION TestFunca AS
'org.example.flink.TestFunca' LANGUAGE JAVA");
tEnv.executeSql("CREATE TABLE datagen (\n" +
" name STRING,\n" +
" pass STRING,\n" +
" type1 INT,\n" +
" t1 STRING,\n" +
" t2 STRING,\n" +
" ts AS localtimestamp,\n" +
" WATERMARK FOR ts AS ts\n" +
") WITH (\n" +
" 'connector' = 'datagen',\n" +
" 'rows-per-second'='1',\n" +
" 'fields.type1.min'='1',\n" +
" 'fields.type1.max'='10'\n" +
")");
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
tEnv.executeSql("CREATE TABLE hive_table (\n" +
" user_id STRING,\n" +
" order_amount STRING\n" +
") PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet
TBLPROPERTIES (\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.delay'='1 h',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'\n" +
")");
tEnv.executeSql("insert into hive_table select
t1,t2,TestFunca(type1),TestFunca(type1) from datagen");
Caused by: org.apache.flink.table.api.ValidationException: Table options do not
contain an option key 'connector' for discovering a connector.
at
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
... 18 more
发送自 Windows 10 版邮件应用