请教各位,我这边使用pyflink 消费kafka  json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,

数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}


发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"


又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。


@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()], 
result_type=DataTypes.STRING())
defkv(log, pair_sep=',', kv_sep='='):
import json

log = json.loads(log)
    ret = {}

    items = re.split(pair_sep, log.get("message"))

for item in items:

        ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]

    log.update(ret)
    log = json.dumps(log)
return log


defregister_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.10")
.topic("logSource")
.start_from_latest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.field_delimiter("\n")) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_source("source")


defregister_sink(st_env):
    st_env.connect(

            Kafka()

                .version("0.10")

                .topic("logSink")

                .start_from_earliest()

                .property("zookeeper.connect", "localhost:2181")

                .property("bootstrap.servers", "localhost:9092")) \

        .with_format(  # declare a format for this system
            Csv()

                .schema(DataTypes.ROW([DataTypes.FIELD("log", 
DataTypes.STRING())]))) \

        .with_schema(  # declare the schema of the table
            Schema()

                .field("log", DataTypes.STRING())) \

        .in_append_mode() \

        .register_table_sink("sink")


if __name__ == '__main__':



    s_env = StreamExecutionEnvironment.get_execution_environment()

    s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)

    s_env.set_parallelism(1)

    st_env = StreamTableEnvironment \

        .create(s_env, environment_settings=EnvironmentSettings

                .new_instance()

                .in_streaming_mode()

                .use_blink_planner().build())

    st_env.register_function('e_kv', e_kv)
    register_source(st_env)

    register_sink(st_env)

    st_env \

        .from_path("source") \

        .select("kv(log,',', '=') as log") \
        .insert_into("sink") \

    st_env.execute("test")



回复