[ 
https://issues.apache.org/jira/browse/KAFKA-7564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16681807#comment-16681807
 ] 

ASF GitHub Bot commented on KAFKA-7564:
---------------------------------------

cmccabe closed pull request #5852: KAFKA-7564: Expose single task details in 
Trogdor
URL: https://github.com/apache/kafka/pull/5852
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index c3271c9a5f3..cd3da904c2b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -30,7 +30,9 @@
 import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -104,6 +106,10 @@ public TasksResponse tasks(TasksRequest request) throws 
Exception {
         return taskManager.tasks(request);
     }
 
+    public TaskState task(TaskRequest request) throws Exception {
+        return taskManager.task(request);
+    }
+
     public void beginShutdown(boolean stopAgents) throws Exception {
         restServer.beginShutdown();
         taskManager.beginShutdown(stopAgents);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 780ae737e0e..80937a84de5 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -32,11 +32,14 @@
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.UriBuilder;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -151,6 +154,13 @@ public TasksResponse tasks(TasksRequest request) throws 
Exception {
         return resp.body();
     }
 
+    public TaskState task(TaskRequest request) throws Exception {
+        String uri = 
UriBuilder.fromPath(url("/coordinator/tasks/{taskId}")).build(request.taskId()).toString();
+        HttpResponse<TaskState> resp = JsonRestServer.httpRequest(log, uri, 
"GET",
+            null, new TypeReference<TaskState>() { }, maxTries);
+        return resp.body();
+    }
+
     public void shutdown() throws Exception {
         HttpResponse<Empty> resp =
             JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), 
"PUT",
@@ -181,6 +191,12 @@ public static void main(String[] args) throws Exception {
             .type(Boolean.class)
             .dest("show_tasks")
             .help("Show coordinator tasks.");
+        actions.addArgument("--show-task")
+            .action(store())
+            .type(String.class)
+            .dest("show_task")
+            .metavar("TASK_ID")
+            .help("Show a specific coordinator task.");
         actions.addArgument("--create-task")
             .action(store())
             .type(String.class)
@@ -229,6 +245,15 @@ public static void main(String[] args) throws Exception {
             System.out.println("Got coordinator tasks: " +
                 JsonUtil.toPrettyJsonString(client.tasks(
                     new TasksRequest(null, 0, 0, 0, 0))));
+        } else if (res.getString("show_task") != null) {
+            String taskId = res.getString("show_task");
+            TaskRequest req = new TaskRequest(res.getString("show_task"));
+            try {
+                String taskOutput = String.format("Got coordinator task 
\"%s\": %s", taskId, JsonUtil.toPrettyJsonString(client.task(req)));
+                System.out.println(taskOutput);
+            } catch (NotFoundException e) {
+                System.out.println(e.getMessage());
+            }
         } else if (res.getString("create_task") != null) {
             CreateTaskRequest req = JsonUtil.JSON_SERDE.
                 readValue(res.getString("create_task"), 
CreateTaskRequest.class);
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index cbfbddd7eda..916372090b0 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -22,7 +22,9 @@
 import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import javax.servlet.ServletContext;
@@ -35,6 +37,8 @@
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.MediaType;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
@@ -101,6 +105,16 @@ public TasksResponse tasks(@QueryParam("taskId") 
List<String> taskId,
         return coordinator().tasks(new TasksRequest(taskId, firstStartMs, 
lastStartMs, firstEndMs, lastEndMs));
     }
 
+    @GET
+    @Path("/tasks/{taskId}")
+    public TaskState tasks(@PathParam("taskId") String taskId) throws 
Throwable {
+        TaskState response = coordinator().task(new TaskRequest(taskId));
+        if (response == null)
+            throw new NotFoundException(String.format("No task with ID \"%s\" 
exists.", taskId));
+
+        return response;
+    }
+
     @PUT
     @Path("/shutdown")
     public Empty beginShutdown(CoordinatorShutdownRequest request) throws 
Throwable {
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java 
b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 74082bdb60a..934acd3648c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -36,6 +36,7 @@
 import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TaskStopping;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerReceiving;
@@ -628,6 +629,36 @@ public TasksResponse call() throws Exception {
         }
     }
 
+    /**
+     * Get information about a single task being managed.
+     *
+     * Returns #{@code null} if the task does not exist
+     */
+    public TaskState task(TaskRequest request) throws ExecutionException, 
InterruptedException {
+        return executor.submit(new GetTaskState(request)).get();
+    }
+
+    /**
+     * Gets information about the tasks being managed.  Processed by the state 
change thread.
+     */
+    class GetTaskState implements Callable<TaskState> {
+        private final TaskRequest request;
+
+        GetTaskState(TaskRequest request) {
+            this.request = request;
+        }
+
+        @Override
+        public TaskState call() throws Exception {
+            ManagedTask task = tasks.get(request.taskId());
+            if (task == null) {
+                return null;
+            }
+
+            return task.taskState();
+        }
+    }
+
     /**
      * Initiate shutdown, but do not wait for it to complete.
      */
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java 
b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java
new file mode 100644
index 00000000000..e42738f5587
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The request to /coordinator/tasks/{taskId}
+ */
+public class TaskRequest {
+    private final String taskId;
+
+    @JsonCreator
+    public TaskRequest(@JsonProperty("taskId") String taskId) {
+        this.taskId = taskId == null ? "" : taskId;
+    }
+
+    @JsonProperty
+    public String taskId() {
+        return taskId;
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java 
b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index 121281f5910..b0e30a031c6 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -71,7 +71,7 @@ public ExpectedTask build() {
         }
     }
 
-    static class ExpectedTask {
+    public static class ExpectedTask {
         private final String id;
         private final TaskSpec taskSpec;
         private final TaskState taskState;
diff --git 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index e9434844405..f22130eda1a 100644
--- 
a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -41,7 +41,9 @@
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
 import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
@@ -53,6 +55,7 @@
 import org.slf4j.LoggerFactory;
 import org.junit.Test;
 
+import javax.ws.rs.NotFoundException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -490,6 +493,40 @@ public void testTasksRequest() throws Exception {
         }
     }
 
+    @Test
+    public void testTaskRequest() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10);
+            coordinatorClient.createTask(new CreateTaskRequest("foo", 
fooSpec));
+            TaskState expectedState = new 
ExpectedTaskBuilder("foo").taskState(new 
TaskPending(fooSpec)).build().taskState();
+
+            TaskState resp = coordinatorClient.task(new TaskRequest("foo"));
+            assertEquals(expectedState, resp);
+
+            time.sleep(2);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2, new 
TextNode("active"))).
+                    workerState(new WorkerRunning("foo", fooSpec, 2, new 
TextNode("active"))).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02"));
+
+            try {
+                coordinatorClient.task(new TaskRequest("non-existent-foo"));
+                fail("Non existent task request should have raised a 
NotFoundException");
+            } catch (NotFoundException ignored) { }
+        }
+    }
+
     @Test
     public void testWorkersExitingAtDifferentTimes() throws Exception {
         MockTime time = new MockTime(0, 0, 0);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Trogdor - Expose single task details from Trogdor Coordinator
> -------------------------------------------------------------
>
>                 Key: KAFKA-7564
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7564
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Stanislav Kozlovski
>            Assignee: Stanislav Kozlovski
>            Priority: Minor
>
> The only way to currently get the results from tasks ran in Trogdor is 
> through listing all of them via the "--show-tasks" CLI command
> {code:java}
> ./bin/trogdor.sh client --show-tasks localhost:8889 Got coordinator tasks: 
> {      "tasks":{         "produce_bench_20462":{            "state":"DONE",   
>        "spec":{               
> "class":"org.apache.kafka.trogdor.workload.ProduceBenchSpec",             
> "startMs":0,             "durationMs":10000000,             
> "producerNode":"node0",             "bootstrapServers":"localhost:9092",      
>        "targetMessagesPerSec":10,             "maxMessages":100,             
> "keyGenerator":{                  "type":"sequential",                
> "size":4,                "startOffset":0             },             
> "valueGenerator":{                  "type":"constant",                
> "size":512,                
> "value":"AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
>              },             "totalTopics":10,             "activeTopics":5,   
>           "topicPrefix":"foo",             "replicationFactor":1,             
> "classLoader":{               },             "numPartitions":1          },    
>       "startedMs":1523552769850,          "doneMs":1523552780878,          
> "cancelled":false,          "status":{               "totalSent":500,         
>     "averageLatencyMs":4.972,             "p50LatencyMs":4,             
> "p95LatencyMs":6,             "p99LatencyMs":12          }       }    } }
> {code}
> This can prove inefficient and annoying if the Trogdor Coordinator is 
> long-running and we only want to get the results from a specific task.
> The current REST endpoint ("/tasks") for listing tasks enables filtering 
> through StartTimeMs/EndTimeMs and supplying specific TaskIDs, but it would be 
> cleaner if we had a specific endpoint for fetching a single task. That 
> endpoint would also return a 404 in the case where no task was found instead 
> of an empty response as the /tasks endpoint would.
> I propose we expose a new "/tasks/:id" endpoint and a new cli command 
> "--show-task TASK_ID"



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to