Thanks Fabian for replying. But I am using KafkaSource only.
The code is something like below.
class MetricSource {
final Set<String> metricSdms = new HashSet();
...
env.addSource(MetricKafkaSourceFactory.createConsumer(jobParams))
.name(MetricSource.class.getSimpleName())
.uid(MetricSource.class.getSimpleName())
.filter(sdm -> metricSdms.contains(sdm.getType()));
}
class MetricKafkaSourceFactory {
public static FlinkKafkaConsumer<SelfDescribingMessageDO>
createConsumer(final Configuration jobParams) {
...
return new FlinkKafkaConsumer<>(topic, new DeserializationSchema(),
props);
}
}
On Wed, Jun 23, 2021 at 7:31 PM Fabian Paul <[email protected]>
wrote:
> Hi Debraj,
>
> By Source Legacy Thread we refer to all sources which do not implement the
> new interface yet [1]. Currently only the Hive, Kafka and FileSource
> are already migrated. In general, there is no sever downside of using the
> older source but in the future we plan only to implement ones based on
> the new operator model.
>
> Best,
> Fabian
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
>