简单示例:

public class TableA implements LookupTableSource {


    @Nullable
    private final LookupCache cache;

    public TableA(@Nullable LookupCache cache) {
        this.cache = cache;
    }

    @Override
    public LookupRuntimeProvider
getLookupRuntimeProvider(LookupContext context) {

        FunctionA lookupFunction = new FunctionA(false);

        if (cache != null) {
            return PartialCachingAsyncLookupProvider.of(lookupFunction, cache);
        } else {
            return AsyncLookupFunctionProvider.of(lookupFunction);
        }
    }

    @Override
    public DynamicTableSource copy() {
        return new TableA(cache);
    }

    @Override
    public String asSummaryString() {
        return "Async Table";
    }
}

public class LookupFunctionA extends AsyncLookupFunction {


    @Override
    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
        CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
        future.completeExceptionally(new IOException("request failed"));
        return future;
    }

}

会出现:
java.lang.StackOverflowError
at
org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:66)
at
org.apache.flink.util.SerializedThrowable.<init>(SerializedThrowable.java:62)
at
org.apache.flink.util.SerializedThrowable.addAllSuppressed(SerializedThrowable.java:150)。

Flink版本1.16~1.18都测试过,一样的报错。

从SerializedThrowable源码看,addAllSuppressed方法没有传递alreadySeen

回复