hello,kenyore. 我大致了解了你的意思,你可以通过继承AsyncTableFunction的方式实现数据库异步IO。
公共抽象类AsyncTableFunction <T> 扩展了UserDefinedFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/UserDefinedFunction.html> AsyncTableFunction <https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html>可以通过实现自定义评估方法来定义a的行为。评估方法必须公开声明,而不是静态声明,并命名为“ eval”。评估方法也可以通过实现多个名为“ eval”的方法来重载。 对于每个“ eval”,都可以触发一个异步io操作,一旦完成,就可以通过调用来收集结果CompletableFuture.complete(T) <http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CompletableFuture.html?is-external=true#complete-T->。对于每个异步操作,调用“ eval”后,其上下文将立即存储在运算符中,从而避免在内部缓冲区未满的情况下阻塞输入的每个流。 代码示例: public void eval(CompletableFuture<Collection<String>> result, String rowkey) { Get get = new Get(Bytes.toBytes(rowkey)); ListenableFuture<Result> future = hbase.asyncGet(get); Futures.addCallback(future, new FutureCallback<Result>() { public void onSuccess(Result result) { List<String> ret = process(result); result.complete(ret); } public void onFailure(Throwable thrown) { result.completeExceptionally(thrown); } }); } 参考链接: https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/table/functions/AsyncTableFunction.html kenyore <[email protected]> 于2021年1月12日周二 下午3:29写道: > 感谢如此详尽的回复! > 但是我的场景似乎无法直接使用维表join。 > 因为我需要把look up的结果(会是多行数据),拼成一个数组放入数据行。 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
