This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 7fadf0a Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907) 7fadf0a is described below commit 7fadf0a11ddb2451adc66e1b179b84b050ad3f4f Author: Stanislav Kozlovski <stanislav_kozlov...@outlook.com> AuthorDate: Mon Nov 26 16:07:15 2018 -0800 Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907) Reviewers: Colin McCabe <cmcc...@apache.org> --- checkstyle/suppressions.xml | 2 +- .../trogdor/coordinator/CoordinatorClient.java | 4 +- .../coordinator/CoordinatorRestResource.java | 29 +++++++--- .../kafka/trogdor/coordinator/TaskManager.java | 38 ++++++------- .../org/apache/kafka/trogdor/rest/TaskState.java | 8 +-- .../apache/kafka/trogdor/rest/TaskStateType.java | 42 +++++++++++++++ .../apache/kafka/trogdor/rest/TasksRequest.java | 23 +++++++- .../apache/kafka/trogdor/common/ExpectedTasks.java | 3 +- .../kafka/trogdor/coordinator/CoordinatorTest.java | 62 ++++++++++++---------- 9 files changed, 146 insertions(+), 65 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f3ab7ec..75ad799 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -57,7 +57,7 @@ files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/> <suppress checks="NPathComplexity" - files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster).java"/> + files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/> <!-- clients tests --> <suppress checks="ClassDataAbstractionCoupling" diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 80937a8..4670d2c 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory; import javax.ws.rs.NotFoundException; import javax.ws.rs.core.UriBuilder; +import java.util.Optional; + import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; @@ -244,7 +246,7 @@ public class CoordinatorClient { } else if (res.getBoolean("show_tasks")) { System.out.println("Got coordinator tasks: " + JsonUtil.toPrettyJsonString(client.tasks( - new TasksRequest(null, 0, 0, 0, 0)))); + new TasksRequest(null, 0, 0, 0, 0, Optional.empty())))); } else if (res.getString("show_task") != null) { String taskId = res.getString("show_task"); TaskRequest req = new TaskRequest(res.getString("show_task")); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java index c0e7fc9..91f731e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java @@ -23,8 +23,9 @@ import org.apache.kafka.trogdor.rest.DestroyTaskRequest; import org.apache.kafka.trogdor.rest.Empty; import org.apache.kafka.trogdor.rest.StopTaskRequest; import org.apache.kafka.trogdor.rest.TaskRequest; -import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TaskState; +import org.apache.kafka.trogdor.rest.TaskStateType; +import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TasksResponse; import javax.servlet.ServletContext; @@ -32,15 +33,17 @@ import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; +import javax.ws.rs.NotFoundException; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; +import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; -import javax.ws.rs.PathParam; -import javax.ws.rs.NotFoundException; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import java.util.List; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; /** @@ -96,13 +99,25 @@ public class CoordinatorRestResource { } @GET - @Path("/tasks") - public TasksResponse tasks(@QueryParam("taskId") List<String> taskId, + @Path("/tasks/") + public Response tasks(@QueryParam("taskId") List<String> taskId, @DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs, @DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs, @DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs, - @DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs) throws Throwable { - return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs)); + @DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs, + @DefaultValue("") @QueryParam("state") String state) throws Throwable { + boolean isEmptyState = state.equals(""); + if (!isEmptyState && !TaskStateType.Constants.VALUES.contains(state)) { + return Response.status(400).entity( + String.format("State %s is invalid. Must be one of %s", + state, TaskStateType.Constants.VALUES) + ).build(); + } + + Optional<TaskStateType> givenState = Optional.ofNullable(isEmptyState ? null : TaskStateType.valueOf(state)); + TasksResponse resp = coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs, givenState)); + + return Response.status(200).entity(resp).build(); } @GET diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java index 934acd3..18ff9cb 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java @@ -32,11 +32,12 @@ import org.apache.kafka.trogdor.common.ThreadUtils; import org.apache.kafka.trogdor.rest.RequestConflictException; import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskPending; +import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskState; +import org.apache.kafka.trogdor.rest.TaskStateType; import org.apache.kafka.trogdor.rest.TaskStopping; import org.apache.kafka.trogdor.rest.TasksRequest; -import org.apache.kafka.trogdor.rest.TaskRequest; import org.apache.kafka.trogdor.rest.TasksResponse; import org.apache.kafka.trogdor.rest.WorkerDone; import org.apache.kafka.trogdor.rest.WorkerReceiving; @@ -142,13 +143,6 @@ public final class TaskManager { Utils.join(nodeManagers.keySet(), ", ")); } - enum ManagedTaskState { - PENDING, - RUNNING, - STOPPING, - DONE; - } - class ManagedTask { /** * The task id. @@ -168,7 +162,7 @@ public final class TaskManager { /** * The task state. */ - private ManagedTaskState state; + private TaskStateType state; /** * The time when the task was started, or -1 if the task has not been started. @@ -201,7 +195,7 @@ public final class TaskManager { */ private String error = ""; - ManagedTask(String id, TaskSpec spec, TaskController controller, ManagedTaskState state) { + ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state) { this.id = id; this.spec = spec; this.controller = controller; @@ -345,13 +339,13 @@ public final class TaskManager { if (failure != null) { log.info("Failed to create a new task {} with spec {}: {}", id, spec, failure); - task = new ManagedTask(id, spec, null, ManagedTaskState.DONE); + task = new ManagedTask(id, spec, null, TaskStateType.DONE); task.doneMs = time.milliseconds(); task.maybeSetError(failure); tasks.put(id, task); return null; } - task = new ManagedTask(id, spec, controller, ManagedTaskState.PENDING); + task = new ManagedTask(id, spec, controller, TaskStateType.PENDING); tasks.put(id, task); long delayMs = task.startDelayMs(time.milliseconds()); task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs); @@ -374,7 +368,7 @@ public final class TaskManager { @Override public Void call() throws Exception { task.clearStartFuture(); - if (task.state != ManagedTaskState.PENDING) { + if (task.state != TaskStateType.PENDING) { log.info("Can't start task {}, because it is already in state {}.", task.id, task.state); return null; @@ -385,12 +379,12 @@ public final class TaskManager { } catch (Exception e) { log.error("Unable to find nodes for task {}", task.id, e); task.doneMs = time.milliseconds(); - task.state = ManagedTaskState.DONE; + task.state = TaskStateType.DONE; task.maybeSetError("Unable to find nodes for task: " + e.getMessage()); return null; } log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", ")); - task.state = ManagedTaskState.RUNNING; + task.state = TaskStateType.RUNNING; task.startedMs = time.milliseconds(); for (String workerName : nodeNames) { long workerId = nextWorkerId++; @@ -441,7 +435,7 @@ public final class TaskManager { task.cancelled = true; task.clearStartFuture(); task.doneMs = time.milliseconds(); - task.state = ManagedTaskState.DONE; + task.state = TaskStateType.DONE; log.info("Stopped pending task {}.", id); break; case RUNNING: @@ -454,14 +448,14 @@ public final class TaskManager { log.info("Task {} is now complete with error: {}", id, task.error); } task.doneMs = time.milliseconds(); - task.state = ManagedTaskState.DONE; + task.state = TaskStateType.DONE; } else { for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) { nodeManagers.get(entry.getKey()).stopWorker(entry.getValue()); } log.info("Cancelling task {} with worker(s) {}", id, Utils.mkString(activeWorkerIds, "", "", " = ", ", ")); - task.state = ManagedTaskState.STOPPING; + task.state = TaskStateType.STOPPING; } break; case STOPPING: @@ -586,14 +580,14 @@ public final class TaskManager { TreeMap<String, Long> activeWorkerIds = task.activeWorkerIds(); if (activeWorkerIds.isEmpty()) { task.doneMs = time.milliseconds(); - task.state = ManagedTaskState.DONE; + task.state = TaskStateType.DONE; log.info("{}: Task {} is now complete on {} with error: {}", nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "), task.error.isEmpty() ? "(none)" : task.error); - } else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) { + } else if ((task.state == TaskStateType.RUNNING) && (!task.error.isEmpty())) { log.info("{}: task {} stopped with error {}. Stopping worker(s): {}", nodeName, task.id, task.error, Utils.mkString(activeWorkerIds, "{", "}", ": ", ", ")); - task.state = ManagedTaskState.STOPPING; + task.state = TaskStateType.STOPPING; for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) { nodeManagers.get(entry.getKey()).stopWorker(entry.getValue()); } @@ -621,7 +615,7 @@ public final class TaskManager { public TasksResponse call() throws Exception { TreeMap<String, TaskState> states = new TreeMap<>(); for (ManagedTask task : tasks.values()) { - if (request.matches(task.id, task.startedMs, task.doneMs)) { + if (request.matches(task.id, task.startedMs, task.doneMs, task.state)) { states.put(task.id, task.taskState()); } } diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java index 0764e14..2428893 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java @@ -31,10 +31,10 @@ import org.apache.kafka.trogdor.task.TaskSpec; include = JsonTypeInfo.As.PROPERTY, property = "state") @JsonSubTypes({ - @JsonSubTypes.Type(value = TaskPending.class, name = "PENDING"), - @JsonSubTypes.Type(value = TaskRunning.class, name = "RUNNING"), - @JsonSubTypes.Type(value = TaskStopping.class, name = "STOPPING"), - @JsonSubTypes.Type(value = TaskDone.class, name = "DONE") + @JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE), + @JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE), + @JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE), + @JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE) }) public abstract class TaskState extends Message { private final TaskSpec spec; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java new file mode 100644 index 0000000..c8ade06 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.trogdor.rest; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * The types of states a single Task can be in + */ +public enum TaskStateType { + PENDING(Constants.PENDING_VALUE), + RUNNING(Constants.RUNNING_VALUE), + STOPPING(Constants.STOPPING_VALUE), + DONE(Constants.DONE_VALUE); + + TaskStateType(String stateType) {} + + public static class Constants { + static final String PENDING_VALUE = "PENDING"; + static final String RUNNING_VALUE = "RUNNING"; + static final String STOPPING_VALUE = "STOPPING"; + static final String DONE_VALUE = "DONE"; + public static final List<String> VALUES = Collections.unmodifiableList( + Arrays.asList(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE)); + } +} diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java index 24b438a..150a362 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import java.util.Collection; import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; /** @@ -55,18 +56,26 @@ public class TasksRequest extends Message { */ private final long lastEndMs; + /** + * The desired state of the tasks. + * An empty string will match all states. + */ + private final Optional<TaskStateType> state; + @JsonCreator public TasksRequest(@JsonProperty("taskIds") Collection<String> taskIds, @JsonProperty("firstStartMs") long firstStartMs, @JsonProperty("lastStartMs") long lastStartMs, @JsonProperty("firstEndMs") long firstEndMs, - @JsonProperty("lastEndMs") long lastEndMs) { + @JsonProperty("lastEndMs") long lastEndMs, + @JsonProperty("state") Optional<TaskStateType> state) { this.taskIds = Collections.unmodifiableSet((taskIds == null) ? new HashSet<String>() : new HashSet<>(taskIds)); this.firstStartMs = Math.max(0, firstStartMs); this.lastStartMs = Math.max(0, lastStartMs); this.firstEndMs = Math.max(0, firstEndMs); this.lastEndMs = Math.max(0, lastEndMs); + this.state = state == null ? Optional.empty() : state; } @JsonProperty @@ -94,6 +103,11 @@ public class TasksRequest extends Message { return lastEndMs; } + @JsonProperty + public Optional<TaskStateType> state() { + return state; + } + /** * Determine if this TaskRequest should return a particular task. * @@ -102,7 +116,7 @@ public class TasksRequest extends Message { * @param endMs The task end time, or -1 if the task hasn't ended. * @return True if information about the task should be returned. */ - public boolean matches(String taskId, long startMs, long endMs) { + public boolean matches(String taskId, long startMs, long endMs, TaskStateType state) { if ((!taskIds.isEmpty()) && (!taskIds.contains(taskId))) { return false; } @@ -118,6 +132,11 @@ public class TasksRequest extends Message { if ((lastEndMs > 0) && ((endMs < 0) || (endMs > lastEndMs))) { return false; } + + if (this.state.isPresent() && !this.state.get().equals(state)) { + return false; + } + return true; } } diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java index b0e30a0..c092c92 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java @@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; public class ExpectedTasks { @@ -146,7 +147,7 @@ public class ExpectedTasks { public boolean conditionMet() { TasksResponse tasks = null; try { - tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0)); + tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty())); } catch (Exception e) { log.info("Unable to get coordinator tasks", e); throw new RuntimeException(e); diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index f22130e..0207104 100644 --- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.trogdor.rest.TaskDone; import org.apache.kafka.trogdor.rest.TaskPending; import org.apache.kafka.trogdor.rest.TaskRunning; import org.apache.kafka.trogdor.rest.TaskRequest; +import org.apache.kafka.trogdor.rest.TaskStateType; import org.apache.kafka.trogdor.rest.TasksRequest; import org.apache.kafka.trogdor.rest.TaskState; import org.apache.kafka.trogdor.rest.TasksResponse; @@ -60,6 +61,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Optional; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -407,34 +409,40 @@ public class CoordinatorTest { @Test public void testTasksRequestMatches() throws Exception { - TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0); - assertTrue(req1.matches("foo1", -1, -1)); - assertTrue(req1.matches("bar1", 100, 200)); - assertTrue(req1.matches("baz1", 100, -1)); - - TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0); - assertFalse(req2.matches("foo1", -1, -1)); - assertTrue(req2.matches("bar1", 100, 200)); - assertFalse(req2.matches("bar1", 99, 200)); - assertFalse(req2.matches("baz1", 99, -1)); - - TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900); - assertFalse(req3.matches("foo1", -1, -1)); - assertFalse(req3.matches("bar1", 100, 200)); - assertFalse(req3.matches("bar1", 200, 1000)); - assertTrue(req3.matches("bar1", 200, 700)); - assertFalse(req3.matches("baz1", 101, -1)); + TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0, Optional.empty()); + assertTrue(req1.matches("foo1", -1, -1, TaskStateType.PENDING)); + assertTrue(req1.matches("bar1", 100, 200, TaskStateType.DONE)); + assertTrue(req1.matches("baz1", 100, -1, TaskStateType.RUNNING)); + + TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0, Optional.empty()); + assertFalse(req2.matches("foo1", -1, -1, TaskStateType.PENDING)); + assertTrue(req2.matches("bar1", 100, 200, TaskStateType.DONE)); + assertFalse(req2.matches("bar1", 99, 200, TaskStateType.DONE)); + assertFalse(req2.matches("baz1", 99, -1, TaskStateType.RUNNING)); + + TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900, Optional.empty()); + assertFalse(req3.matches("foo1", -1, -1, TaskStateType.PENDING)); + assertFalse(req3.matches("bar1", 100, 200, TaskStateType.DONE)); + assertFalse(req3.matches("bar1", 200, 1000, TaskStateType.DONE)); + assertTrue(req3.matches("bar1", 200, 700, TaskStateType.DONE)); + assertFalse(req3.matches("baz1", 101, -1, TaskStateType.RUNNING)); List<String> taskIds = new ArrayList<>(); taskIds.add("foo1"); taskIds.add("bar1"); taskIds.add("baz1"); - TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1); - assertFalse(req4.matches("foo1", -1, -1)); - assertTrue(req4.matches("foo1", 1000, -1)); - assertFalse(req4.matches("foo1", 900, -1)); - assertFalse(req4.matches("baz2", 2000, -1)); - assertFalse(req4.matches("baz2", -1, -1)); + TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1, Optional.empty()); + assertFalse(req4.matches("foo1", -1, -1, TaskStateType.PENDING)); + assertTrue(req4.matches("foo1", 1000, -1, TaskStateType.RUNNING)); + assertFalse(req4.matches("foo1", 900, -1, TaskStateType.RUNNING)); + assertFalse(req4.matches("baz2", 2000, -1, TaskStateType.RUNNING)); + assertFalse(req4.matches("baz2", -1, -1, TaskStateType.PENDING)); + + TasksRequest req5 = new TasksRequest(null, 0, 0, 0, 0, Optional.of(TaskStateType.RUNNING)); + assertTrue(req5.matches("foo1", -1, -1, TaskStateType.RUNNING)); + assertFalse(req5.matches("bar1", -1, -1, TaskStateType.DONE)); + assertFalse(req5.matches("baz1", -1, -1, TaskStateType.STOPPING)); + assertFalse(req5.matches("baz1", -1, -1, TaskStateType.PENDING)); } @Test @@ -463,9 +471,9 @@ public class CoordinatorTest { waitFor(coordinatorClient); assertEquals(0, coordinatorClient.tasks( - new TasksRequest(null, 10, 0, 10, 0)).tasks().size()); + new TasksRequest(null, 10, 0, 10, 0, Optional.empty())).tasks().size()); TasksResponse resp1 = coordinatorClient.tasks( - new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0)); + new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0, Optional.empty())); assertTrue(resp1.tasks().containsKey("foo")); assertFalse(resp1.tasks().containsKey("bar")); assertEquals(1, resp1.tasks().size()); @@ -483,13 +491,13 @@ public class CoordinatorTest { waitFor(cluster.agentClient("node02")); TasksResponse resp2 = coordinatorClient.tasks( - new TasksRequest(null, 1, 0, 0, 0)); + new TasksRequest(null, 1, 0, 0, 0, Optional.empty())); assertTrue(resp2.tasks().containsKey("foo")); assertFalse(resp2.tasks().containsKey("bar")); assertEquals(1, resp2.tasks().size()); assertEquals(0, coordinatorClient.tasks( - new TasksRequest(null, 3, 0, 0, 0)).tasks().size()); + new TasksRequest(null, 3, 0, 0, 0, Optional.empty())).tasks().size()); } }