Hi,
其实这个是CSV connector的一个可选的
quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
    st_env.connect(
        Kafka()
            .version("0.11")
            .topic("logSink")
            .start_from_earliest()
            .property("zookeeper.connect", "localhost:2181")
            .property("bootstrap.servers", "localhost:9092")) \
        .with_format(  # declare a format for this system
        Csv()
            .schema(DataTypes.ROW([DataTypes.FIELD("log",
DataTypes.STRING())]))
            .quote_character("\0")
    ) \
        .with_schema(  # declare the schema of the table
        Schema()
            .field("log", DataTypes.STRING())) \
        .in_append_mode() \
        .register_table_sink("sink")

Best,
Xingbo

jack <wslyk...@163.com> 于2020年6月1日周一 下午5:31写道:

> *请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,*
>
> *数据输入:*
> {"topic": "logSource", "message": "x=1,y=1,z=1"}
>
> 发送到kafka里面的数据结果如下:
> "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
>
> *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。*
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), 
> DataTypes.STRING()], result_type=DataTypes.STRING())
> def kv(log, pair_sep=',', kv_sep='='):
>     import json
>     log = json.loads(log)
>     ret = {}
>     items = re.split(pair_sep, log.get("message"))
>     for item in items:
>         ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
>     log.update(ret)
>     log = json.dumps(log)
>     return log
>
>
> def register_source(st_env):
> st_env \
> .connect( # declare the external system to connect to
> Kafka()
> .version("0.10")
> .topic("logSource")
> .start_from_latest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
> .field_delimiter("\n")) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_source("source")
>
> def register_sink(st_env):
>     st_env.connect(
>             Kafka()
>                 .version("0.10")
>                 .topic("logSink")
>                 .start_from_earliest()
>                 .property("zookeeper.connect", "localhost:2181")
>                 .property("bootstrap.servers", "localhost:9092")) \
>         .with_format(  # declare a format for this system
>             Csv()
>                 .schema(DataTypes.ROW([DataTypes.FIELD("log", 
> DataTypes.STRING())]))) \
>         .with_schema(  # declare the schema of the table
>             Schema()
>                 .field("log", DataTypes.STRING())) \
>         .in_append_mode() \
>         .register_table_sink("sink")
>
> if __name__ == '__main__':
>
>     s_env = StreamExecutionEnvironment.get_execution_environment()
>     s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
>     s_env.set_parallelism(1)
>     st_env = StreamTableEnvironment \
>         .create(s_env, environment_settings=EnvironmentSettings
>                 .new_instance()
>                 .in_streaming_mode()
>                 .use_blink_planner().build())
>     st_env.register_function('e_kv', e_kv)
>     register_source(st_env)
>     register_sink(st_env)
>     st_env \
>         .from_path("source") \
>         .select("kv(log,',', '=') as log") \
>         .insert_into("sink") \
>     st_env.execute("test")
>
>
>

Reply via email to