Flink team好, 我有一个很一般的问题,关于如何释放 Flink Job 中某个对象持有的资源。
我是 Flink 的新用户。我搜索了很多,但没有找到相关文件。但我确信有一个标准的方法来解决它。 我的Flink 应用程序中需要访问 Elasticsearch 服务器。我们使用从 org.elasticsearch.client.RestHighLevelClient 扩展而来的类 EsClient 来完成查询工作, 一旦不再使用它就需要调用它的`close`方法来释放资源。 所以我需要找到合适的地方来确保资源总是可以被释放,即使在调用的某个地方发生了一些异常 我现在能想到的是使用 `ThreadLocal` 将生成的 EsClient 对象保留在main class的 main 方法的开头,并且 在 main 方法结束时释放资源。 类似这样的伪代码: ```java 公共类 EsClientHolder { private static final ThreadLocal<EsClient> local = new InheritableThreadLocal<>(); public static final void createAndSetEsClient(EsClient esClient){ local.set(esClient); } private static final createAndSetEsClientBy(EsClientConfig esClientConfig){ EsClient instance = new EsClient(esClientConfig); createAndSetEsClient(instance) ; } private static final EsClient get() { EsClient c = local.get(); if(c == null){ throw new RuntimeException("确保在使用前创建并设置 EsClient 实例"); } return c; } private static final close()抛出 IOException { EsClient o = local.get(); if(o!= null){ o.close(); } } // 在 Fink 应用程序代码中的用法 public class main class { public static void main(String[] args) throws IOException { try { property prop = null; EsClientConfig configuration = getEsClientConfig(prop); EsClientHolder.createAndSetEsClientBy(config); // … SomeClass.method1(); other classes.method2(); // ... } at last { EsClientHolder.close(); } } } class SomeClass{ public void. method 1(){ // 1. Use EsClient in any calling method of any other class: EsClient esClient = EsClientHolder.get(); // … } } class other class { public void method 2() { // 2. Use EsClient in any calling method of any forked child thread new thread ( () -> { EsClient client = EsClientHolder.get(); // … }) . start(); // … } } ``` 我知道 TaskManager 是一个 Java JVM 进程,而 Task 是由一个 java Thread 执行的。 但我不知道 Flink 如何创建作业图,以及 Flink 最终如何将任务分配给线程以及这些线程之间的关系。 比如 Flink 把 SomeClass 的 method1 和 OtherClass 的 method2 分配到一个和运行 MainClass 的线程不一样的线程, 那么运行method1和mehod2的线程就没有办法拿到EsClient了。 这里我假设 MainClass 中的 main 方法将在一个线程中执行。如果不是,比如将 set() 和 close() 拆分为在不同的线程中运行,则就 没有办法释放资源。 谢谢!