我以相同的方式 实现了一个  KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:

org.apache.flink.table.planner.delegation.BlinkExecutorFactory
        at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
        at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
        at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
        ... 3 more

这个改怎样解决呢?

谢谢,
王磊



[email protected] 

 
Sender: [email protected]
Send Time: 2020-03-29 10:32
Receiver: [email protected]
Subject: RE: 实现 KafkaUpsertTableSink
Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
 
 
-----Original Message-----
From: [email protected] 
<[email protected]> On Behalf Of Benchao 
Li
Sent: Saturday, March 28, 2020 6:28 PM
To: user-zh <[email protected]>
Subject: Re: 实现 KafkaUpsertTableSink
 
Hi,
 
你需要把你新增的Factory添加到 resources下的
META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
 
<[email protected]> 于2020年3月28日周六 下午5:38写道:
 
> 各位大佬:
>
>                 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> KafkaTableSink 和 HbaseUpsertTableSink  的一套逻辑实现了一套
> KafkaUpsertTableSink:
>
>                 KafkaUpsertTableSink
>
> KafkaUpsertTableSinkBase
>
> KafkaUpsertTableSourceSinkFactory
>
> KafkaUpsertTableSourceSinkFactoryBase
>
> MyKafkaValidator
>
> 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> 呢?
>
>
>
>
> /**
> * Searches for factories using Java service providers.
> *
> * @return all factories in the classpath */ private static 
> List<TableFactory> discoverFactories(Optional<ClassLoader>
> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       //todo add
>       result.add(new KafkaUpsertTableSourceSinkFactory());
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for 
> table factories.", e);
>    }
>
> }
>
>
>
>
>
> 直接在返回的 result 里面添加对应的  KafkaUpsertTableSourceSinkFactory  是可
> 以成功运行的。
>
> 非常感谢
>
>
>
>
>
> ------------------
>
> Thanks
>
> venn
>
>
>
>
 
-- 
 
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]

回复