非常感谢解惑,刚开始使用pyflink,不是很熟悉,还请多多指教
















在 2020-06-01 20:50:53,"Xingbo Huang" <hxbks...@gmail.com> 写道:

Hi, 
其实这个是CSV 
connector的一个可选的quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
    st_env.connect(
        Kafka()
            .version("0.11")
            .topic("logSink")
            .start_from_earliest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")) \
        .with_format(  # declare a format for this system
        Csv()
            .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
            .quote_character("\0")
    ) \
        .with_schema(  # declare the schema of the table
        Schema()
            .field("log", DataTypes.STRING())) \
        .in_append_mode() \
        .register_table_sink("sink")



Best,
Xingbo


Reply via email to