dear all :
目前有个小需求,由于binlog数据不同,不方便直接使用 format="json",想自定义format进行处理。
但是根据 “implements DeserializationFormatFactory,
SerializationFormatFactory”
这样自定义format之后,只能处理binlog的原始数据,我还想拿kafka的offset、partition等信息。我看kafka原生的DynamicKafkaDeserializationSchema
有方法
deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData>
collector) 。
包装了offset 的对象:ConsumerRecord
,我也想使用这个东西,或者用自定义的format替换他这部分,能做到吗?还是要connector 进行从定义?
- 如何使用kafka的KafkaDynamicTableFactory ,替换他的json format Michael Ran
