Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。


-----Original Message-----
From: [email protected] 
<[email protected]> On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh <[email protected]>
Subject: Re: 实现 KafkaUpsertTableSink

Hi,

你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?

<[email protected]> 于2020年3月28日周六 下午5:38写道:

> 各位大佬:
>
>                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
>                 KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       //todo add
>       result.add(new KafkaUpsertTableSourceSinkFactory());
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for 
> table factories.", e);
>    }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]

回复