我只保留 KafkaRetractTableSourceSinkFactory 一个, KafkaRetractTableSinkBase 实现
RetractStreamTableSink 接口,在 consumeDataStream 实现只有 True 才发送,最终 work 了。
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>>
dataStream) {
DataStream dtNeed = dataStream.filter(x -> x.f0 == Boolean.TRUE).map(x ->
x.f1);
INSERT INTO table1 SELCET field, count(*) from table2 group by field 这是 一个
RetractStream,结果里面会有 True/False, 通过这个过滤是可以的。
INSERT INTO table1 SELECT feild, 1 from table2
我理解这不是一个 RetractStream, 上面 dataStream.filter(x -> x.f0 ==
Boolean.TRUE) 的代码应该会出错,但实际上没有出错
还不是完全能理解,我再看一下吧。
谢谢,
王磊
[email protected]
Sender: Benchao Li
Send Time: 2020-03-31 12:02
Receiver: user-zh
Subject: Re: Re: 实现 KafkaUpsertTableSink
我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方,
然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?
[email protected] <[email protected]> 于2020年3月31日周二 上午11:17写道:
> 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成
> KafkaRetractTableSourceSinkFactory 写了一遍
> 但这个应该怎样改才合适呢?
>
> 137 private static <T extends TableFactory> T
> findSingleInternal(
> 138 Class<T> factoryClass,
> 139 Map<String, String> properties,
> 140 Optional<ClassLoader> classLoader) {
> 141
> 142 List<TableFactory> tableFactories =
> discoverFactories(classLoader);
> 143 List<T> filtered = filter(tableFactories,
> factoryClass, properties);
> 144
> 145 if (filtered.size() > 1) {
> 146 throw new AmbiguousTableFactoryException(
> 147 filtered,
> 148 factoryClass,
> 149 tableFactories,
> 150 properties);
> 151 } else {
> 152 return filtered.get(0);
> 153 }
> 154 }
>
>
> 谢谢,
> 王磊
>
>
> [email protected]
>
>
> Sender: [email protected]
> Send Time: 2020-03-31 10:50
> Receiver: user-zh
> Subject: Re: RE: 实现 KafkaUpsertTableSink
>
> 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client:
>
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
> at
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146)
> at
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742)
> ... 3 more
>
> 这个改怎样解决呢?
>
> 谢谢,
> 王磊
>
>
>
> [email protected]
>
> Sender: [email protected]
> Send Time: 2020-03-29 10:32
> Receiver: [email protected]
> Subject: RE: 实现 KafkaUpsertTableSink
> Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。
> -----Original Message-----
> From: [email protected]
> <[email protected]> On Behalf Of
> Benchao Li
> Sent: Saturday, March 28, 2020 6:28 PM
> To: user-zh <[email protected]>
> Subject: Re: 实现 KafkaUpsertTableSink
> Hi,
> 你需要把你新增的Factory添加到 resources下的
>
> META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢?
> <[email protected]> 于2020年3月28日周六 下午5:38写道:
> > 各位大佬:
> >
> > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照
> > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套
> > KafkaUpsertTableSink:
> >
> > KafkaUpsertTableSink
> >
> > KafkaUpsertTableSinkBase
> >
> > KafkaUpsertTableSourceSinkFactory
> >
> > KafkaUpsertTableSourceSinkFactoryBase
> >
> > MyKafkaValidator
> >
> > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的
> > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册
> > 呢?
> >
> >
> >
> >
> > /**
> > * Searches for factories using Java service providers.
> > *
> > * @return all factories in the classpath */ private static
> > List<TableFactory> discoverFactories(Optional<ClassLoader>
> > classLoader) {
> > try {
> > List<TableFactory> result = new LinkedList<>();
> > ClassLoader cl =
> > classLoader.orElse(Thread.currentThread().getContextClassLoader());
> > ServiceLoader
> > .load(TableFactory.class, cl)
> > .iterator()
> > .forEachRemaining(result::add);
> > //todo add
> > result.add(new KafkaUpsertTableSourceSinkFactory());
> > return result;
> > } catch (ServiceConfigurationError e) {
> > LOG.error("Could not load service provider for table factories.",
> e);
> > throw new TableException("Could not load service provider for
> > table factories.", e);
> > }
> >
> > }
> >
> >
> >
> >
> >
> > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可
> > 以成功运行的。
> >
> > 非常感谢
> >
> >
> >
> >
> >
> > ------------------
> >
> > Thanks
> >
> > venn
> >
> >
> >
> >
> --
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: [email protected]; [email protected]
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]