Repository: helix Updated Branches: refs/heads/master 71e4b6a66 -> a09a18ac5
[HELIX-780] add get/add job user content rest api Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a09a18ac Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a09a18ac Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a09a18ac Branch: refs/heads/master Commit: a09a18ac55464c3e399800b4474ccb6e64d168ec Parents: 71e4b6a Author: Harry Zhang <hrzh...@linkedin.com> Authored: Mon Oct 8 15:36:53 2018 -0700 Committer: Harry Zhang <hrzh...@linkedin.com> Committed: Thu Nov 1 12:10:05 2018 -0700 ---------------------------------------------------------------------- .../server/resources/helix/JobAccessor.java | 68 ++++++++++++++++++-- .../helix/rest/server/TestJobAccessor.java | 38 +++++++++++ 2 files changed, 102 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/a09a18ac/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java index 9a085f1..a984428 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/resources/helix/JobAccessor.java @@ -19,7 +19,6 @@ package org.apache.helix.rest.server.resources.helix; * under the License. */ - import java.io.IOException; import java.util.Collections; import java.util.HashMap; @@ -28,12 +27,13 @@ import java.util.Set; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; - +import org.I0Itec.zkclient.exception.ZkNoNodeException; import org.apache.helix.HelixException; import org.apache.helix.ZNRecord; import org.apache.helix.task.JobConfig; @@ -41,11 +41,12 @@ import org.apache.helix.task.JobContext; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.WorkflowConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.codehaus.jackson.node.ArrayNode; import org.codehaus.jackson.node.JsonNodeFactory; import org.codehaus.jackson.node.ObjectNode; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @Path("/clusters/{clusterId}/workflows/{workflowName}/jobs") public class JobAccessor extends AbstractHelixResource { @@ -171,6 +172,65 @@ public class JobAccessor extends AbstractHelixResource { return badRequest("Job context for " + jobName + " does not exists"); } + @GET + @Path("{jobName}/userContent") + public Response getJobUserContent(@PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName, @PathParam("jobName") String jobName) { + TaskDriver taskDriver = getTaskDriver(clusterId); + try { + Map<String, String> contentStore = + taskDriver.getJobUserContentMap(workflowName, jobName); + if (contentStore == null) { + return JSONRepresentation(Collections.emptyMap()); + } + return JSONRepresentation(contentStore); + } catch (ZkNoNodeException e) { + return notFound("Unable to find content store"); + } catch (Exception e) { + return serverError(e); + } + } + + @POST + @Path("{jobName}/userContent") + public Response updateWorkflowUserContent( + @PathParam("clusterId") String clusterId, + @PathParam("workflowName") String workflowName, + @PathParam("jobName") String jobName, + @QueryParam("command") String commandStr, + String content + ) { + Command cmd; + Map<String, String> contentMap = Collections.emptyMap(); + try { + contentMap = OBJECT_MAPPER.readValue(content, new TypeReference<Map<String, String>>() { + }); + cmd = (commandStr == null || commandStr.isEmpty()) + ? Command.update + : Command.valueOf(commandStr); + } catch (IOException e) { + return badRequest(String + .format("Content %s cannot be deserialized to Map<String, String>. Err: %s", content, + e.getMessage())); + } catch (IllegalArgumentException ie) { + return badRequest(String.format("Invalid command: %s. Err: %s", commandStr, ie.getMessage())); + } + + TaskDriver driver = getTaskDriver(clusterId); + try { + switch (cmd) { + case update: + driver.addOrUpdateJobUserContentMap(workflowName, jobName, contentMap); + return OK(); + default: + return badRequest(String.format("Command \"%s\" is not supported!", cmd)); + } + } catch (Exception e) { + _logger.error("Failed to update user content store", e); + return serverError(e); + } + } + protected static JobConfig.Builder getJobConfig(Map<String, String> cfgMap) { return new JobConfig.Builder().fromMap(cfgMap); } http://git-wip-us.apache.org/repos/asf/helix/blob/a09a18ac/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java ---------------------------------------------------------------------- diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java index 682039d..1cf377b 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/TestJobAccessor.java @@ -19,8 +19,11 @@ package org.apache.helix.rest.server; * under the License. */ +import com.google.common.collect.ImmutableMap; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; @@ -36,6 +39,7 @@ import org.apache.helix.task.TaskPartitionState; import org.apache.helix.task.TaskUtil; import org.apache.helix.task.WorkflowConfig; import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.type.TypeReference; import org.testng.Assert; import org.testng.annotations.Test; @@ -135,6 +139,40 @@ public class TestJobAccessor extends AbstractTestClass { } @Test(dependsOnMethods = "testCreateJob") + public void testGetAddJobContent() throws IOException { + System.out.println("Start test :" + TestHelper.getTestMethodName()); + String uri = "clusters/" + CLUSTER_NAME + "/workflows/Workflow_0/jobs/Job_0/userContent"; + + // Empty user content + String body = + get(uri, Response.Status.OK.getStatusCode(), true); + Map<String, String> contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>() {}); + Assert.assertTrue(contentStore.isEmpty()); + + // Post user content + Map<String, String> map1 = new HashMap<>(); + map1.put("k1", "v1"); + Entity entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE); + post(uri, ImmutableMap.of("command", "delete"), entity, Response.Status.BAD_REQUEST.getStatusCode()); + post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode()); + + // update (add items) workflow content store + body = get(uri, Response.Status.OK.getStatusCode(), true); + contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>() {}); + Assert.assertEquals(contentStore, map1); + + // modify map1 and verify + map1.put("k1", "v2"); + map1.put("k2", "v2"); + entity = Entity.entity(OBJECT_MAPPER.writeValueAsString(map1), MediaType.APPLICATION_JSON_TYPE); + post(uri, ImmutableMap.of("command", "update"), entity, Response.Status.OK.getStatusCode()); + body = get(uri, Response.Status.OK.getStatusCode(), true); + contentStore = OBJECT_MAPPER.readValue(body, new TypeReference<Map<String, String>>() {}); + Assert.assertEquals(contentStore, map1); + + } + + @Test(dependsOnMethods = "testCreateJob") public void testDeleteJob() { System.out.println("Start test :" + TestHelper.getTestMethodName()); TaskDriver driver = getTaskDriver(CLUSTER_NAME);