This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 65592ad  cache StorageClient (#3078)
65592ad is described below

commit 65592ad2ae9b3f460ef7af95c78f02dbd0f1c5e1
Author: Jia Zhai <jiaz...@users.noreply.github.com>
AuthorDate: Wed Nov 28 22:45:59 2018 +0800

    cache StorageClient (#3078)
---
 .../functions/worker/rest/api/FunctionsImpl.java   | 62 ++++++++++++----------
 1 file changed, 34 insertions(+), 28 deletions(-)

diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 474032e..7f3ed44 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -47,6 +47,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import javax.ws.rs.WebApplicationException;
@@ -121,6 +122,8 @@ public class FunctionsImpl {
     public static final String SOURCE = "Source";
     public static final String SINK = "Sink";
 
+    private final AtomicReference<StorageClient> storageClient = new 
AtomicReference<>();
+
     private final Supplier<WorkerService> workerServiceSupplier;
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
@@ -1074,39 +1077,42 @@ public class FunctionsImpl {
 
         String stateStorageServiceUrl = 
worker().getWorkerConfig().getStateStorageServiceUrl();
 
-        try (StorageClient client = StorageClientBuilder.newBuilder()
-            .withSettings(StorageClientSettings.newBuilder()
-                .serviceUri(stateStorageServiceUrl)
-                .clientName("functions-admin")
-                .build())
-            .withNamespace(tableNs)
-            .build()) {
-            try (Table<ByteBuf, ByteBuf> table = 
result(client.openTable(tableName))) {
-                try (KeyValue<ByteBuf, ByteBuf> kv = 
result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
-                    if (null == kv) {
-                        return Response.status(Status.NOT_FOUND)
-                            .entity(new String("key '" + key + "' doesn't 
exist."))
-                            .build();
+        if (storageClient.get() == null) {
+            storageClient.compareAndSet(null, StorageClientBuilder.newBuilder()
+                .withSettings(StorageClientSettings.newBuilder()
+                    .serviceUri(stateStorageServiceUrl)
+                    .clientName("functions-admin")
+                    .build())
+                .withNamespace(tableNs)
+                .build());
+        }
+
+        try (Table<ByteBuf, ByteBuf> table = 
result(storageClient.get().openTable(tableName))) {
+            try (KeyValue<ByteBuf, ByteBuf> kv = 
result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
+                if (null == kv) {
+                    return Response.status(Status.NOT_FOUND)
+                        .entity(new String("key '" + key + "' doesn't exist."))
+                        .build();
+                } else {
+                    String value;
+                    if (kv.isNumber()) {
+                        value = "value : " + kv.numberValue() + ", version : " 
+ kv.version();
                     } else {
-                        String value;
-                        if (kv.isNumber()) {
-                            value = "value : " + kv.numberValue() + ", version 
: " + kv.version();
-                        } else {
-                            value = "value : " + new 
String(ByteBufUtil.getBytes(kv.value()), UTF_8)
-                                + ", version : " + kv.version();
-                        }
-                        return Response.status(Status.OK)
-                            .entity(new String(value))
-                            .build();
+                        value = "value : " + new 
String(ByteBufUtil.getBytes(kv.value()), UTF_8)
+                            + ", version : " + kv.version();
                     }
+                    return Response.status(Status.OK)
+                        .entity(new String(value))
+                        .build();
                 }
-            } catch (Exception e) {
-                log.error("Error while getFunctionState request @ 
/{}/{}/{}/{}",
-                    tenant, namespace, functionName, key, e);
-                return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
             }
+        } catch (Exception e) {
+            log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
+                tenant, namespace, functionName, key, e);
+            return 
Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
+                .entity(new ErrorData(e.getMessage())).build();
         }
+
     }
 
     public Response uploadFunction(final InputStream uploadedInputStream, 
final String path) {

Reply via email to