Hi sunfulin:
我这么实现是可以的。
public void eval(CompletableFuture<Collection<Row>> result, String key) {
executorService.submit(() -> {
try {
Row row = fetchdata(key);
if (row != null) {
result.complete(Collections.singletonList(row));
} else {
result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
}
} catch (Exception e) {
result.complete(Collections.singletonList(new Row(this.fieldNames.length)));
}
});
}
Best forideal.
在 2020-07-02 15:56:46,"sunfulin" <[email protected]> 写道:
>hi,
>我在使用flink 1.10.1 blink
>planner,通过扩展tablesourcesinkfactory和asynctablefunction扩展了一个维表,维表会初始化并调用rpc服务。
>遇到一个比较诡异的问题,作业在执行如下join的sql时,没有任何输出,等一段时间后,抛出了异常:Caused by :
>java.lang.Exception: Could not complete the stream element:
>org.apache.flink.table.dataformat.BinaryRow.... caused by :
>java.util.concurrent.TimeoutException: Async function call has timed out.
>
>
>我开了debug日志,debug时在我的lookupfunction.eval里,可以正常调用rpc接口服务并future.complete,但是并不输出任何结果。不确定可能是啥原因。望指导。谢谢。