standalone
lib jar??????
flink-connector-hive_2.11-1.11.0.jar
flink-json-1.11.0.jar
flink-sql-connector-kafka_2.12-1.11.0.jar
log4j-api-2.12.1.jar
flink-csv-1.11.0.jar
flink-parquet_2.11-1.11.0.jar
flink-table_2.11-1.11.0.jar
log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar
flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar
flink-table-blink_2.11-1.11.0.jar
log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar
flink-shaded-zookeeper-3.4.14.jar
log4j-1.2-api-2.12.1.jar
??????????idea????????
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// ????????????????????????????
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));
tableEnv.executeSql("CREATE TABLE source_table (\n" +
"\thost STRING,\n" +
"\turl STRING,\n" +
"\tpublic_date STRING\n" +
") WITH (\n" +
"\t'connector.type' = 'kafka',\n" +
"\t'connector.version' = 'universal',\n" +
"\t'connector.startup-mode' = 'latest-offset',\n" +
"\t'connector.topic' = 'test_flink_1.11',\n" +
"\t'connector.properties.group.id' = 'domain_testGroup',\n" +
"\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
"\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
"\t'update-mode' = 'append',\n" +
"\t'format.type' = 'json',\n" +
"\t'format.derive-schema' = 'true'\n" +
")");
tableEnv.executeSql("CREATE TABLE fs_table (\n" +
" host STRING,\n" +
" url STRING,\n" +
" public_date STRING\n" +
") PARTITIONED BY (public_date) WITH (\n" +
" 'connector'='filesystem',\n" +
" 'path'='path',\n" +
" 'format'='json',\n" +
" 'sink.partition-commit.delay'='0s',\n" +
" 'sink.partition-commit.policy.kind'='success-file'\n" +
")");
tableEnv.executeSql("INSERT INTO fs_table SELECT host, url,
DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();
????????
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of
org.apache.commons.collections.map.LinkedMap to field
??????bug
sink??hdfs??????????parquet??????lib??????parquet????pom??????provided????????????????error????????pom????????provided????????OK