This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new e5a7b53 Fix: get function status with auth enable (#2516) e5a7b53 is described below commit e5a7b532ffbf1d8813fc31403162fe3c17e937f5 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Thu Sep 13 11:29:37 2018 -0700 Fix: get function status with auth enable (#2516) --- .../pulsar/broker/admin/impl/FunctionsBase.java | 2 +- .../client/admin/internal/FunctionsImpl.java | 5 ++- .../functions/worker/FunctionRuntimeManager.java | 41 ++++++++-------------- .../functions/worker/rest/api/FunctionsImpl.java | 6 ++-- .../worker/rest/api/v2/FunctionApiV2Resource.java | 2 +- 5 files changed, 23 insertions(+), 33 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index def5452..62d12ec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -160,7 +160,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, functionName, instanceId); + tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @GET diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 7dc7050..1d315d7 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -110,9 +110,8 @@ public class FunctionsImpl extends BaseResource implements Functions { } } - @Override - public FunctionStatus getFunctionStatus(String tenant, String namespace, String function, int id) - throws PulsarAdminException { + public FunctionStatus getFunctionStatus( + String tenant, String namespace, String function, int id) throws PulsarAdminException { try { Response response = request( functions.path(tenant).path(namespace).path(function).path(Integer.toString(id)).path("status")) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 93828de..ee6eeec 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -26,6 +26,7 @@ import java.net.URISyntaxException; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; @@ -248,7 +249,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ * @return the function status */ public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace, - String functionName, int instanceId) { + String functionName, int instanceId, URI uri) { Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); final String assignedWorkerId = assignment.getWorkerId(); final String workerId = this.workerConfig.getWorkerId(); @@ -306,23 +307,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ return functionStatusBuilder.build(); } - Client client = ClientBuilder.newClient(); - - // TODO: implement authentication/authorization - String jsonResponse = client.target(String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/status", - workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId)) - .request(MediaType.TEXT_PLAIN) - .get(String.class); - - InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder(); - try { - org.apache.pulsar.functions.utils.Utils.mergeJson(jsonResponse, functionStatusBuilder); - } catch (IOException e) { - log.warn("Got invalid function status response from {}", workerInfo, e); - throw new RuntimeException(e); - } - functionStatusBuilder.setWorkerId(assignedWorkerId); - functionStatus = functionStatusBuilder.build(); + URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build(); + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } return functionStatus; @@ -426,9 +412,10 @@ public class FunctionRuntimeManager implements AutoCloseable{ * @param namespace the namespace the function belongs to * @param functionName the function name * @return a list of function statuses + * @throws PulsarAdminException */ public InstanceCommunication.FunctionStatusList getAllFunctionStatus( - String tenant, String namespace, String functionName) { + String tenant, String namespace, String functionName) throws PulsarAdminException { Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName); @@ -438,13 +425,15 @@ public class FunctionRuntimeManager implements AutoCloseable{ } for (Assignment assignment : assignments) { - - InstanceCommunication.FunctionStatus functionStatus = this.getFunctionInstanceStatus( - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), - assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), - assignment.getInstance().getInstanceId()); - + boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId()); + InstanceCommunication.FunctionStatus functionStatus = isOwner + ? (getFunctionInstanceStatus(tenant, namespace, functionName, + assignment.getInstance().getInstanceId(), null)) + : this.functionAdmin.functions().getFunctionStatus( + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), + assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), + assignment.getInstance().getInstanceId()); functionStatusListBuilder.addFunctionStatusList(functionStatus); } return functionStatusListBuilder.build(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 136bab0..df82c0d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -332,7 +332,7 @@ public class FunctionsImpl { } public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName, - final String instanceId) throws IOException { + final String instanceId, URI uri) throws IOException { if (!isWorkerServiceAvailable()) { return getUnavailableResponse(); @@ -358,7 +358,9 @@ public class FunctionsImpl { FunctionStatus functionStatus = null; try { functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName, - Integer.parseInt(instanceId)); + Integer.parseInt(instanceId), uri); + } catch (WebApplicationException we) { + throw we; } catch (Exception e) { log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e); return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 95fe687..6c6fb18 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -104,7 +104,7 @@ public class FunctionApiV2Resource extends FunctionApiResource { final @PathParam("functionName") String functionName, final @PathParam("instanceId") String instanceId) throws IOException { return functions.getFunctionInstanceStatus( - tenant, namespace, functionName, instanceId); + tenant, namespace, functionName, instanceId, uri.getRequestUri()); } @GET