standalone lib jar?????? flink-connector-hive_2.11-1.11.0.jar flink-json-1.11.0.jar flink-sql-connector-kafka_2.12-1.11.0.jar log4j-api-2.12.1.jar flink-csv-1.11.0.jar flink-parquet_2.11-1.11.0.jar flink-table_2.11-1.11.0.jar log4j-core-2.12.1.jar flink-dist_2.11-1.11.0.jar flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar flink-table-blink_2.11-1.11.0.jar log4j-slf4j-impl-2.12.1.jar flink-hadoop-compatibility_2.11-1.11.0.jar flink-shaded-zookeeper-3.4.14.jar log4j-1.2-api-2.12.1.jar
??????????idea???????? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.setParallelism(1); env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE); // ???????????????????????????? env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);; env.setStateBackend(new FsStateBackend(path)); tableEnv.executeSql("CREATE TABLE source_table (\n" + "\thost STRING,\n" + "\turl STRING,\n" + "\tpublic_date STRING\n" + ") WITH (\n" + "\t'connector.type' = 'kafka',\n" + "\t'connector.version' = 'universal',\n" + "\t'connector.startup-mode' = 'latest-offset',\n" + "\t'connector.topic' = 'test_flink_1.11',\n" + "\t'connector.properties.group.id' = 'domain_testGroup',\n" + "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" + "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" + "\t'update-mode' = 'append',\n" + "\t'format.type' = 'json',\n" + "\t'format.derive-schema' = 'true'\n" + ")"); tableEnv.executeSql("CREATE TABLE fs_table (\n" + " host STRING,\n" + " url STRING,\n" + " public_date STRING\n" + ") PARTITIONED BY (public_date) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='path',\n" + " 'format'='json',\n" + " 'sink.partition-commit.delay'='0s',\n" + " 'sink.partition-commit.policy.kind'='success-file'\n" + ")"); tableEnv.executeSql("INSERT INTO fs_table SELECT host, url, DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table"); TableResult result = tableEnv.executeSql("SELECT * FROM fs_table "); result.print(); ???????? org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field ??????bug sink??hdfs??????????parquet??????lib??????parquet????pom??????provided????????????????error????????pom????????provided????????OK