standalone 
lib  jar??????
flink-connector-hive_2.11-1.11.0.jar        
flink-json-1.11.0.jar                
        flink-sql-connector-kafka_2.12-1.11.0.jar  
log4j-api-2.12.1.jar
flink-csv-1.11.0.jar                
        flink-parquet_2.11-1.11.0.jar      
          flink-table_2.11-1.11.0.jar    
            log4j-core-2.12.1.jar
flink-dist_2.11-1.11.0.jar              
    flink-shaded-hadoop-2-uber-2.7.2.11-9.0.jar  
flink-table-blink_2.11-1.11.0.jar          
log4j-slf4j-impl-2.12.1.jar
flink-hadoop-compatibility_2.11-1.11.0.jar  
flink-shaded-zookeeper-3.4.14.jar            
log4j-1.2-api-2.12.1.jar





??????????idea????????
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
// ????????????????????????????
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);;
env.setStateBackend(new FsStateBackend(path));

tableEnv.executeSql("CREATE TABLE source_table (\n" +
        "\thost STRING,\n" +
        "\turl STRING,\n" +
        "\tpublic_date STRING\n" +
        ") WITH (\n" +
        "\t'connector.type' = 'kafka',\n" +
        "\t'connector.version' = 'universal',\n" +
        "\t'connector.startup-mode' = 'latest-offset',\n" +
        "\t'connector.topic' = 'test_flink_1.11',\n" +
        "\t'connector.properties.group.id' = 'domain_testGroup',\n" +
        "\t'connector.properties.zookeeper.connect' = '127.0.0.1:2181',\n" +
        "\t'connector.properties.bootstrap.servers' = '127.0.0.1:9092',\n" +
        "\t'update-mode' = 'append',\n" +
        "\t'format.type' = 'json',\n" +
        "\t'format.derive-schema' = 'true'\n" +
        ")");

tableEnv.executeSql("CREATE TABLE fs_table (\n" +
        "  host STRING,\n" +
        "  url STRING,\n" +
        "  public_date STRING\n" +
        ") PARTITIONED BY (public_date) WITH (\n" +
        "  'connector'='filesystem',\n" +
        "  'path'='path',\n" +
        "  'format'='json',\n" +
        "  'sink.partition-commit.delay'='0s',\n" +
        "  'sink.partition-commit.policy.kind'='success-file'\n" +
        ")");

tableEnv.executeSql("INSERT INTO  fs_table SELECT host, url, 
DATE_FORMAT(public_date, 'yyyy-MM-dd') FROM source_table");
TableResult result = tableEnv.executeSql("SELECT * FROM fs_table ");
result.print();
????????
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
    at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:291)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.<init&gt;(OperatorChain.java:126)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
&nbsp;&nbsp;&nbsp;&nbsp;at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of 
org.apache.commons.collections.map.LinkedMap to field 


??????bug 
sink??hdfs??????????parquet??????lib??????parquet????pom??????provided????????????????error????????pom????????provided????????OK

回复