This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 65592ad cache StorageClient (#3078) 65592ad is described below commit 65592ad2ae9b3f460ef7af95c78f02dbd0f1c5e1 Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Wed Nov 28 22:45:59 2018 +0800 cache StorageClient (#3078) --- .../functions/worker/rest/api/FunctionsImpl.java | 62 ++++++++++++---------- 1 file changed, 34 insertions(+), 28 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 474032e..7f3ed44 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.ws.rs.WebApplicationException; @@ -121,6 +122,8 @@ public class FunctionsImpl { public static final String SOURCE = "Source"; public static final String SINK = "Sink"; + private final AtomicReference<StorageClient> storageClient = new AtomicReference<>(); + private final Supplier<WorkerService> workerServiceSupplier; public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) { @@ -1074,39 +1077,42 @@ public class FunctionsImpl { String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl(); - try (StorageClient client = StorageClientBuilder.newBuilder() - .withSettings(StorageClientSettings.newBuilder() - .serviceUri(stateStorageServiceUrl) - .clientName("functions-admin") - .build()) - .withNamespace(tableNs) - .build()) { - try (Table<ByteBuf, ByteBuf> table = result(client.openTable(tableName))) { - try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) { - if (null == kv) { - return Response.status(Status.NOT_FOUND) - .entity(new String("key '" + key + "' doesn't exist.")) - .build(); + if (storageClient.get() == null) { + storageClient.compareAndSet(null, StorageClientBuilder.newBuilder() + .withSettings(StorageClientSettings.newBuilder() + .serviceUri(stateStorageServiceUrl) + .clientName("functions-admin") + .build()) + .withNamespace(tableNs) + .build()); + } + + try (Table<ByteBuf, ByteBuf> table = result(storageClient.get().openTable(tableName))) { + try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) { + if (null == kv) { + return Response.status(Status.NOT_FOUND) + .entity(new String("key '" + key + "' doesn't exist.")) + .build(); + } else { + String value; + if (kv.isNumber()) { + value = "value : " + kv.numberValue() + ", version : " + kv.version(); } else { - String value; - if (kv.isNumber()) { - value = "value : " + kv.numberValue() + ", version : " + kv.version(); - } else { - value = "value : " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8) - + ", version : " + kv.version(); - } - return Response.status(Status.OK) - .entity(new String(value)) - .build(); + value = "value : " + new String(ByteBufUtil.getBytes(kv.value()), UTF_8) + + ", version : " + kv.version(); } + return Response.status(Status.OK) + .entity(new String(value)) + .build(); } - } catch (Exception e) { - log.error("Error while getFunctionState request @ /{}/{}/{}/{}", - tenant, namespace, functionName, key, e); - return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) - .entity(new ErrorData(e.getMessage())).build(); } + } catch (Exception e) { + log.error("Error while getFunctionState request @ /{}/{}/{}/{}", + tenant, namespace, functionName, key, e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); } + } public Response uploadFunction(final InputStream uploadedInputStream, final String path) {