请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,
数据输入:
{"topic": "logSource", "message": "x=1,y=1,z=1"}
发送到kafka里面的数据结果如下:
"{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。
@udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
result_type=DataTypes.STRING())
defkv(log, pair_sep=',', kv_sep='='):
import json
log = json.loads(log)
ret = {}
items = re.split(pair_sep, log.get("message"))
for item in items:
ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
log.update(ret)
log = json.dumps(log)
return log
defregister_source(st_env):
st_env \
.connect( # declare the external system to connect to
Kafka()
.version("0.10")
.topic("logSource")
.start_from_latest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
.field_delimiter("\n")) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_source("source")
defregister_sink(st_env):
st_env.connect(
Kafka()
.version("0.10")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log",
DataTypes.STRING())]))) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")
if __name__ == '__main__':
s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
st_env.register_function('e_kv', e_kv)
register_source(st_env)
register_sink(st_env)
st_env \
.from_path("source") \
.select("kv(log,',', '=') as log") \
.insert_into("sink") \
st_env.execute("test")