Hi,
其实这个是CSV connector的一个可选的
quote_character参数的效果,默认值是",所以你每个字段都会这样外面包一层,你可以配置一下这个参数为\0,就可以得到你要的效果了。
st_env.connect(
Kafka()
.version("0.11")
.topic("logSink")
.start_from_earliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092")) \
.with_format( # declare a format for this system
Csv()
.schema(DataTypes.ROW([DataTypes.FIELD("log",
DataTypes.STRING())]))
.quote_character("\0")
) \
.with_schema( # declare the schema of the table
Schema()
.field("log", DataTypes.STRING())) \
.in_append_mode() \
.register_table_sink("sink")
Best,
Xingbo
jack <[email protected]> 于2020年6月1日周一 下午5:31写道:
> *请教各位,我这边使用pyflink 消费kafka json字符串数据发送到kafka, 把kafka消息按照一个字段来处理,*
>
> *数据输入:*
> {"topic": "logSource", "message": "x=1,y=1,z=1"}
>
> 发送到kafka里面的数据结果如下:
> "{""topic"": ""logSource"", ""message"": ""x=1,y=1,z=1""}"
>
> *又被双引号包了一层, 我的udf函数中使用的是 json模块进行处理,请各位帮我看一下怎么样转成正常的json串。*
>
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()], result_type=DataTypes.STRING())
> def kv(log, pair_sep=',', kv_sep='='):
> import json
> log = json.loads(log)
> ret = {}
> items = re.split(pair_sep, log.get("message"))
> for item in items:
> ret[re.split(kv_sep, item)[0]] = re.split(kv_sep, item)[1]
> log.update(ret)
> log = json.dumps(log)
> return log
>
>
> def register_source(st_env):
> st_env \
> .connect( # declare the external system to connect to
> Kafka()
> .version("0.10")
> .topic("logSource")
> .start_from_latest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log", DataTypes.STRING())]))
> .field_delimiter("\n")) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_source("source")
>
> def register_sink(st_env):
> st_env.connect(
> Kafka()
> .version("0.10")
> .topic("logSink")
> .start_from_earliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092")) \
> .with_format( # declare a format for this system
> Csv()
> .schema(DataTypes.ROW([DataTypes.FIELD("log",
> DataTypes.STRING())]))) \
> .with_schema( # declare the schema of the table
> Schema()
> .field("log", DataTypes.STRING())) \
> .in_append_mode() \
> .register_table_sink("sink")
>
> if __name__ == '__main__':
>
> s_env = StreamExecutionEnvironment.get_execution_environment()
> s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
> s_env.set_parallelism(1)
> st_env = StreamTableEnvironment \
> .create(s_env, environment_settings=EnvironmentSettings
> .new_instance()
> .in_streaming_mode()
> .use_blink_planner().build())
> st_env.register_function('e_kv', e_kv)
> register_source(st_env)
> register_sink(st_env)
> st_env \
> .from_path("source") \
> .select("kv(log,',', '=') as log") \
> .insert_into("sink") \
> st_env.execute("test")
>
>
>